mirror of
https://github.com/onepanelio/onepanel.git
synced 2025-10-05 13:46:51 +08:00
Merge remote-tracking branch 'origin/feat/workspaces' into feat/integrate.workflow.changes
This commit is contained in:
@@ -387,6 +387,76 @@ func (c *Client) CountCronWorkflows(namespace, workflowTemplateUID string) (coun
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Client) buildCronWorkflowDefinition(namespace string, workflowTemplateId *uint64, wf *wfv1.Workflow, cwf *wfv1.CronWorkflow, opts *WorkflowExecutionOptions) (cronWorkflow *wfv1.CronWorkflow, err error) {
|
||||||
|
if opts == nil {
|
||||||
|
opts = &WorkflowExecutionOptions{}
|
||||||
|
}
|
||||||
|
|
||||||
|
if opts.Name != "" {
|
||||||
|
cwf.ObjectMeta.Name = opts.Name
|
||||||
|
}
|
||||||
|
if opts.GenerateName != "" {
|
||||||
|
cwf.ObjectMeta.GenerateName = opts.GenerateName
|
||||||
|
}
|
||||||
|
if opts.Entrypoint != "" {
|
||||||
|
cwf.Spec.WorkflowSpec.Entrypoint = opts.Entrypoint
|
||||||
|
}
|
||||||
|
if opts.ServiceAccount != "" {
|
||||||
|
cwf.Spec.WorkflowSpec.ServiceAccountName = opts.ServiceAccount
|
||||||
|
}
|
||||||
|
if len(opts.Parameters) > 0 {
|
||||||
|
newParams := make([]wfv1.Parameter, 0)
|
||||||
|
passedParams := make(map[string]bool)
|
||||||
|
for _, param := range opts.Parameters {
|
||||||
|
newParams = append(newParams, wfv1.Parameter{
|
||||||
|
Name: param.Name,
|
||||||
|
Value: param.Value,
|
||||||
|
})
|
||||||
|
passedParams[param.Name] = true
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, param := range cwf.Spec.WorkflowSpec.Arguments.Parameters {
|
||||||
|
if _, ok := passedParams[param.Name]; ok {
|
||||||
|
// this parameter was overridden via command line
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
newParams = append(newParams, param)
|
||||||
|
}
|
||||||
|
cwf.Spec.WorkflowSpec.Arguments.Parameters = newParams
|
||||||
|
wf.Spec.Arguments.Parameters = newParams
|
||||||
|
}
|
||||||
|
if opts.Labels != nil {
|
||||||
|
cwf.ObjectMeta.Labels = *opts.Labels
|
||||||
|
}
|
||||||
|
|
||||||
|
err = injectExitHandlerWorkflowExecutionStatistic(wf, namespace, workflowTemplateId)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
err = injectInitHandlerWorkflowExecutionStatistic(wf, namespace, workflowTemplateId)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err = c.injectAutomatedFields(namespace, wf, opts); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
cwf.Spec.WorkflowSpec = wf.Spec
|
||||||
|
cwf.Spec.WorkflowMetadata = &wf.ObjectMeta
|
||||||
|
|
||||||
|
//merge the labels
|
||||||
|
mergedLabels := wf.ObjectMeta.Labels
|
||||||
|
if mergedLabels == nil {
|
||||||
|
mergedLabels = make(map[string]string)
|
||||||
|
}
|
||||||
|
for k, v := range *opts.Labels {
|
||||||
|
mergedLabels[k] = v
|
||||||
|
}
|
||||||
|
cwf.Spec.WorkflowMetadata.Labels = mergedLabels
|
||||||
|
|
||||||
|
return cwf, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Client) updateCronWorkflow(namespace string, name string, workflowTemplateId *uint64, wf *wfv1.Workflow, cwf *wfv1.CronWorkflow, opts *WorkflowExecutionOptions) (updatedCronWorkflow *wfv1.CronWorkflow, err error) {
|
func (c *Client) updateCronWorkflow(namespace string, name string, workflowTemplateId *uint64, wf *wfv1.Workflow, cwf *wfv1.CronWorkflow, opts *WorkflowExecutionOptions) (updatedCronWorkflow *wfv1.CronWorkflow, err error) {
|
||||||
//Make sure the CronWorkflow exists before we edit it
|
//Make sure the CronWorkflow exists before we edit it
|
||||||
toUpdateCWF, err := c.ArgoprojV1alpha1().CronWorkflows(namespace).Get(name, metav1.GetOptions{})
|
toUpdateCWF, err := c.ArgoprojV1alpha1().CronWorkflows(namespace).Get(name, metav1.GetOptions{})
|
||||||
@@ -399,71 +469,10 @@ func (c *Client) updateCronWorkflow(namespace string, name string, workflowTempl
|
|||||||
return nil, util.NewUserError(codes.NotFound, "CronWorkflow not found.")
|
return nil, util.NewUserError(codes.NotFound, "CronWorkflow not found.")
|
||||||
}
|
}
|
||||||
|
|
||||||
if opts == nil {
|
cwf, err = c.buildCronWorkflowDefinition(namespace, workflowTemplateId, wf, cwf, opts)
|
||||||
opts = &WorkflowExecutionOptions{}
|
|
||||||
}
|
|
||||||
|
|
||||||
if opts.Name != "" {
|
|
||||||
cwf.ObjectMeta.Name = opts.Name
|
|
||||||
}
|
|
||||||
if opts.GenerateName != "" {
|
|
||||||
cwf.ObjectMeta.GenerateName = opts.GenerateName
|
|
||||||
}
|
|
||||||
if opts.Entrypoint != "" {
|
|
||||||
cwf.Spec.WorkflowSpec.Entrypoint = opts.Entrypoint
|
|
||||||
}
|
|
||||||
if opts.ServiceAccount != "" {
|
|
||||||
cwf.Spec.WorkflowSpec.ServiceAccountName = opts.ServiceAccount
|
|
||||||
}
|
|
||||||
if len(opts.Parameters) > 0 {
|
|
||||||
newParams := make([]wfv1.Parameter, 0)
|
|
||||||
passedParams := make(map[string]bool)
|
|
||||||
for _, param := range opts.Parameters {
|
|
||||||
newParams = append(newParams, wfv1.Parameter{
|
|
||||||
Name: param.Name,
|
|
||||||
Value: param.Value,
|
|
||||||
})
|
|
||||||
passedParams[param.Name] = true
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, param := range cwf.Spec.WorkflowSpec.Arguments.Parameters {
|
|
||||||
if _, ok := passedParams[param.Name]; ok {
|
|
||||||
// this parameter was overridden via command line
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
newParams = append(newParams, param)
|
|
||||||
}
|
|
||||||
cwf.Spec.WorkflowSpec.Arguments.Parameters = newParams
|
|
||||||
wf.Spec.Arguments.Parameters = newParams
|
|
||||||
}
|
|
||||||
if opts.Labels != nil {
|
|
||||||
cwf.ObjectMeta.Labels = *opts.Labels
|
|
||||||
}
|
|
||||||
|
|
||||||
err = injectExitHandlerWorkflowExecutionStatistic(wf, namespace, workflowTemplateId)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return
|
||||||
}
|
}
|
||||||
err = injectInitHandlerWorkflowExecutionStatistic(wf, namespace, workflowTemplateId)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if err = c.injectAutomatedFields(namespace, wf, opts); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
cwf.Spec.WorkflowSpec = wf.Spec
|
|
||||||
cwf.Spec.WorkflowMetadata = &wf.ObjectMeta
|
|
||||||
|
|
||||||
//merge the labels
|
|
||||||
mergedLabels := wf.ObjectMeta.Labels
|
|
||||||
if mergedLabels == nil {
|
|
||||||
mergedLabels = make(map[string]string)
|
|
||||||
}
|
|
||||||
for k, v := range *opts.Labels {
|
|
||||||
mergedLabels[k] = v
|
|
||||||
}
|
|
||||||
cwf.Spec.WorkflowMetadata.Labels = mergedLabels
|
|
||||||
|
|
||||||
cwf.Name = name
|
cwf.Name = name
|
||||||
cwf.ResourceVersion = toUpdateCWF.ResourceVersion
|
cwf.ResourceVersion = toUpdateCWF.ResourceVersion
|
||||||
@@ -476,70 +485,11 @@ func (c *Client) updateCronWorkflow(namespace string, name string, workflowTempl
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) createCronWorkflow(namespace string, workflowTemplateId *uint64, wf *wfv1.Workflow, cwf *wfv1.CronWorkflow, opts *WorkflowExecutionOptions) (createdCronWorkflow *wfv1.CronWorkflow, err error) {
|
func (c *Client) createCronWorkflow(namespace string, workflowTemplateId *uint64, wf *wfv1.Workflow, cwf *wfv1.CronWorkflow, opts *WorkflowExecutionOptions) (createdCronWorkflow *wfv1.CronWorkflow, err error) {
|
||||||
if opts == nil {
|
cwf, err = c.buildCronWorkflowDefinition(namespace, workflowTemplateId, wf, cwf, opts)
|
||||||
opts = &WorkflowExecutionOptions{}
|
|
||||||
}
|
|
||||||
|
|
||||||
if opts.Name != "" {
|
|
||||||
cwf.ObjectMeta.Name = opts.Name
|
|
||||||
}
|
|
||||||
if opts.GenerateName != "" {
|
|
||||||
cwf.ObjectMeta.GenerateName = opts.GenerateName
|
|
||||||
}
|
|
||||||
if opts.Entrypoint != "" {
|
|
||||||
cwf.Spec.WorkflowSpec.Entrypoint = opts.Entrypoint
|
|
||||||
}
|
|
||||||
if opts.ServiceAccount != "" {
|
|
||||||
cwf.Spec.WorkflowSpec.ServiceAccountName = opts.ServiceAccount
|
|
||||||
}
|
|
||||||
if len(opts.Parameters) > 0 {
|
|
||||||
newParams := make([]wfv1.Parameter, 0)
|
|
||||||
passedParams := make(map[string]bool)
|
|
||||||
for _, param := range opts.Parameters {
|
|
||||||
newParams = append(newParams, wfv1.Parameter{
|
|
||||||
Name: param.Name,
|
|
||||||
Value: param.Value,
|
|
||||||
})
|
|
||||||
passedParams[param.Name] = true
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, param := range cwf.Spec.WorkflowSpec.Arguments.Parameters {
|
|
||||||
if _, ok := passedParams[param.Name]; ok {
|
|
||||||
// this parameter was overridden via command line
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
newParams = append(newParams, param)
|
|
||||||
}
|
|
||||||
cwf.Spec.WorkflowSpec.Arguments.Parameters = newParams
|
|
||||||
wf.Spec.Arguments.Parameters = newParams
|
|
||||||
}
|
|
||||||
if opts.Labels != nil {
|
|
||||||
cwf.ObjectMeta.Labels = *opts.Labels
|
|
||||||
}
|
|
||||||
err = injectExitHandlerWorkflowExecutionStatistic(wf, namespace, workflowTemplateId)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return
|
||||||
}
|
|
||||||
err = injectInitHandlerWorkflowExecutionStatistic(wf, namespace, workflowTemplateId)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if err = c.injectAutomatedFields(namespace, wf, opts); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
cwf.Spec.WorkflowSpec = wf.Spec
|
|
||||||
cwf.Spec.WorkflowMetadata = &wf.ObjectMeta
|
|
||||||
|
|
||||||
//merge the labels
|
|
||||||
mergedLabels := wf.ObjectMeta.Labels
|
|
||||||
if mergedLabels == nil {
|
|
||||||
mergedLabels = make(map[string]string)
|
|
||||||
}
|
|
||||||
for k, v := range *opts.Labels {
|
|
||||||
mergedLabels[k] = v
|
|
||||||
}
|
|
||||||
cwf.Spec.WorkflowMetadata.Labels = mergedLabels
|
|
||||||
createdCronWorkflow, err = c.ArgoprojV1alpha1().CronWorkflows(namespace).Create(cwf)
|
createdCronWorkflow, err = c.ArgoprojV1alpha1().CronWorkflows(namespace).Create(cwf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@@ -14,6 +14,7 @@ import (
|
|||||||
"github.com/onepanelio/core/pkg/util/ptr"
|
"github.com/onepanelio/core/pkg/util/ptr"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
|
"net/http"
|
||||||
"regexp"
|
"regexp"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
@@ -86,6 +87,45 @@ func UnmarshalWorkflows(wfBytes []byte, strict bool) (wfs []wfv1.Workflow, err e
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func addSystemUIDParameter(wf *wfv1.Workflow) {
|
||||||
|
for _, p := range wf.Spec.Arguments.Parameters {
|
||||||
|
if p.Name == "sys-uid" {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
uid := wf.Labels[label.WorkflowUid]
|
||||||
|
if uid == "" {
|
||||||
|
uid = "00000000-0000-0000-0000-000000000000"
|
||||||
|
}
|
||||||
|
if wf.Spec.Arguments.Parameters == nil {
|
||||||
|
wf.Spec.Arguments.Parameters = make([]wfv1.Parameter, 0)
|
||||||
|
}
|
||||||
|
wf.Spec.Arguments.Parameters = append(wf.Spec.Arguments.Parameters, wfv1.Parameter{
|
||||||
|
Name: "sys-uid",
|
||||||
|
Value: ptr.String(uid),
|
||||||
|
})
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func addEnvToTemplate(template *wfv1.Template, key string, value string) {
|
||||||
|
//Flag to prevent over-writing user's envs
|
||||||
|
overwriteUserEnv := true
|
||||||
|
for _, templateEnv := range template.Container.Env {
|
||||||
|
if templateEnv.Name == key {
|
||||||
|
overwriteUserEnv = false
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if overwriteUserEnv {
|
||||||
|
template.Container.Env = append(template.Container.Env, corev1.EnvVar{
|
||||||
|
Name: key,
|
||||||
|
Value: value,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Client) injectAutomatedFields(namespace string, wf *wfv1.Workflow, opts *WorkflowExecutionOptions) (err error) {
|
func (c *Client) injectAutomatedFields(namespace string, wf *wfv1.Workflow, opts *WorkflowExecutionOptions) (err error) {
|
||||||
if opts.PodGCStrategy == nil {
|
if opts.PodGCStrategy == nil {
|
||||||
if wf.Spec.PodGC == nil {
|
if wf.Spec.PodGC == nil {
|
||||||
@@ -102,6 +142,8 @@ func (c *Client) injectAutomatedFields(namespace string, wf *wfv1.Workflow, opts
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
addSystemUIDParameter(wf)
|
||||||
|
|
||||||
addSecretValsToTemplate := true
|
addSecretValsToTemplate := true
|
||||||
secret, err := c.GetSecret(namespace, "onepanel-default-env")
|
secret, err := c.GetSecret(namespace, "onepanel-default-env")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -181,23 +223,6 @@ func (c *Client) injectAutomatedFields(namespace string, wf *wfv1.Workflow, opts
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func addEnvToTemplate(template *wfv1.Template, key string, value string) {
|
|
||||||
//Flag to prevent over-writing user's envs
|
|
||||||
overwriteUserEnv := true
|
|
||||||
for _, templateEnv := range template.Container.Env {
|
|
||||||
if templateEnv.Name == key {
|
|
||||||
overwriteUserEnv = false
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if overwriteUserEnv {
|
|
||||||
template.Container.Env = append(template.Container.Env, corev1.EnvVar{
|
|
||||||
Name: key,
|
|
||||||
Value: value,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Client) createWorkflow(namespace string, workflowTemplateId uint64, workflowTemplateVersionId uint64, wf *wfv1.Workflow, opts *WorkflowExecutionOptions) (newDbId uint64, createdWorkflow *wfv1.Workflow, err error) {
|
func (c *Client) createWorkflow(namespace string, workflowTemplateId uint64, workflowTemplateVersionId uint64, wf *wfv1.Workflow, opts *WorkflowExecutionOptions) (newDbId uint64, createdWorkflow *wfv1.Workflow, err error) {
|
||||||
if opts == nil {
|
if opts == nil {
|
||||||
opts = &WorkflowExecutionOptions{}
|
opts = &WorkflowExecutionOptions{}
|
||||||
@@ -383,7 +408,7 @@ func (c *Client) CreateWorkflowExecution(namespace string, workflow *WorkflowExe
|
|||||||
workflow.ID = id
|
workflow.ID = id
|
||||||
workflow.Name = createdWorkflow.Name
|
workflow.Name = createdWorkflow.Name
|
||||||
workflow.CreatedAt = createdWorkflow.CreationTimestamp.UTC()
|
workflow.CreatedAt = createdWorkflow.CreationTimestamp.UTC()
|
||||||
workflow.UID = string(createdWorkflow.ObjectMeta.UID)
|
workflow.UID = workflowUid
|
||||||
workflow.WorkflowTemplate = workflowTemplate
|
workflow.WorkflowTemplate = workflowTemplate
|
||||||
// Manifests could get big, don't return them in this case.
|
// Manifests could get big, don't return them in this case.
|
||||||
workflow.WorkflowTemplate.Manifest = ""
|
workflow.WorkflowTemplate.Manifest = ""
|
||||||
@@ -1283,7 +1308,7 @@ func (c *Client) GetWorkflowExecutionStatisticsForTemplates(workflowTemplates ..
|
|||||||
Will build a template that makes a CURL request to the onepanel-core API,
|
Will build a template that makes a CURL request to the onepanel-core API,
|
||||||
with statistics about the workflow that was just executed.
|
with statistics about the workflow that was just executed.
|
||||||
*/
|
*/
|
||||||
func getCURLNodeTemplate(name, curlPath, curlBody string) (template *wfv1.Template, err error) {
|
func getCURLNodeTemplate(name, curlMethod, curlPath, curlBody string, inputs wfv1.Inputs) (template *wfv1.Template, err error) {
|
||||||
host := env.GetEnv("ONEPANEL_CORE_SERVICE_HOST", "onepanel-core.onepanel.svc.cluster.local")
|
host := env.GetEnv("ONEPANEL_CORE_SERVICE_HOST", "onepanel-core.onepanel.svc.cluster.local")
|
||||||
if host == "" {
|
if host == "" {
|
||||||
err = errors.New("ONEPANEL_CORE_SERVICE_HOST is empty.")
|
err = errors.New("ONEPANEL_CORE_SERVICE_HOST is empty.")
|
||||||
@@ -1296,13 +1321,15 @@ func getCURLNodeTemplate(name, curlPath, curlBody string) (template *wfv1.Templa
|
|||||||
}
|
}
|
||||||
endpoint := fmt.Sprintf("http://%s:%s%s", host, port, curlPath)
|
endpoint := fmt.Sprintf("http://%s:%s%s", host, port, curlPath)
|
||||||
template = &wfv1.Template{
|
template = &wfv1.Template{
|
||||||
Name: name,
|
Name: name,
|
||||||
|
Inputs: inputs,
|
||||||
Container: &corev1.Container{
|
Container: &corev1.Container{
|
||||||
Name: "curl",
|
Name: "curl",
|
||||||
Image: "curlimages/curl",
|
Image: "curlimages/curl",
|
||||||
Command: []string{"sh", "-c"},
|
Command: []string{"sh", "-c"},
|
||||||
Args: []string{
|
Args: []string{
|
||||||
"SERVICE_ACCOUNT_TOKEN=$(cat /var/run/secrets/kubernetes.io/serviceaccount/token) && curl -s -o /dev/null -w '%{http_code}' '" + endpoint + "' -H \"Content-Type: application/json\" -H 'Connection: keep-alive' -H 'Accept: application/json' " +
|
"SERVICE_ACCOUNT_TOKEN=$(cat /var/run/secrets/kubernetes.io/serviceaccount/token) " +
|
||||||
|
"&& curl -X " + curlMethod + " -s -o /dev/null -w '%{http_code}' '" + endpoint + "' -H \"Content-Type: application/json\" -H 'Connection: keep-alive' -H 'Accept: application/json' " +
|
||||||
"-H 'Authorization: Bearer '\"$SERVICE_ACCOUNT_TOKEN\"'' " +
|
"-H 'Authorization: Bearer '\"$SERVICE_ACCOUNT_TOKEN\"'' " +
|
||||||
"--data '" + curlBody + "' --compressed",
|
"--data '" + curlBody + "' --compressed",
|
||||||
},
|
},
|
||||||
@@ -1321,7 +1348,7 @@ func injectExitHandlerWorkflowExecutionStatistic(wf *wfv1.Workflow, namespace st
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
statsTemplate, err := getCURLNodeTemplate("sys-send-exit-stats", curlPath, string(statisticsBytes))
|
statsTemplate, err := getCURLNodeTemplate("sys-send-exit-stats", http.MethodPost, curlPath, string(statisticsBytes), wfv1.Inputs{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -1367,7 +1394,7 @@ func injectInitHandlerWorkflowExecutionStatistic(wf *wfv1.Workflow, namespace st
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
containerTemplate, err := getCURLNodeTemplate("sys-send-init-stats", curlPath, string(statisticsBytes))
|
containerTemplate, err := getCURLNodeTemplate("sys-send-init-stats", http.MethodPost, curlPath, string(statisticsBytes), wfv1.Inputs{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@@ -14,6 +14,7 @@ import (
|
|||||||
networking "istio.io/api/networking/v1alpha3"
|
networking "istio.io/api/networking/v1alpha3"
|
||||||
corev1 "k8s.io/api/core/v1"
|
corev1 "k8s.io/api/core/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"net/http"
|
||||||
"sigs.k8s.io/yaml"
|
"sigs.k8s.io/yaml"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -41,12 +42,6 @@ func generateArguments(spec *WorkspaceSpec, config map[string]string) (err error
|
|||||||
})
|
})
|
||||||
|
|
||||||
// TODO: These can be removed when lint validation of workflows work
|
// TODO: These can be removed when lint validation of workflows work
|
||||||
// Workspace UID
|
|
||||||
spec.Arguments.Parameters = append(spec.Arguments.Parameters, Parameter{
|
|
||||||
Name: "sys-uid",
|
|
||||||
Value: ptr.String("00000000-0000-0000-0000-000000000000"),
|
|
||||||
Type: "input.hidden",
|
|
||||||
})
|
|
||||||
// Resource action parameter
|
// Resource action parameter
|
||||||
spec.Arguments.Parameters = append(spec.Arguments.Parameters, Parameter{
|
spec.Arguments.Parameters = append(spec.Arguments.Parameters, Parameter{
|
||||||
Name: "sys-resource-action",
|
Name: "sys-resource-action",
|
||||||
@@ -287,6 +282,48 @@ metadata:
|
|||||||
When: "{{workflow.parameters.sys-workspace-action}} == delete",
|
When: "{{workflow.parameters.sys-workspace-action}} == delete",
|
||||||
WithItems: volumeClaimItems,
|
WithItems: volumeClaimItems,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
Name: "sys-set-phase-running",
|
||||||
|
Template: "sys-update-status",
|
||||||
|
Dependencies: []string{"stateful-set"},
|
||||||
|
Arguments: wfv1.Arguments{
|
||||||
|
Parameters: []wfv1.Parameter{
|
||||||
|
{
|
||||||
|
Name: "sys-workspace-phase",
|
||||||
|
Value: ptr.String(string(WorkspaceRunning)),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
When: "{{workflow.parameters.sys-workspace-action}} == create",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "sys-set-phase-pausing",
|
||||||
|
Template: "sys-update-status",
|
||||||
|
Dependencies: []string{"delete-stateful-set"},
|
||||||
|
Arguments: wfv1.Arguments{
|
||||||
|
Parameters: []wfv1.Parameter{
|
||||||
|
{
|
||||||
|
Name: "sys-workspace-phase",
|
||||||
|
Value: ptr.String(string(WorkspacePausing)),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
When: "{{workflow.parameters.sys-workspace-action}} == pause",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "sys-set-phase-terminating",
|
||||||
|
Template: "sys-update-status",
|
||||||
|
Dependencies: []string{"delete-pvc"},
|
||||||
|
Arguments: wfv1.Arguments{
|
||||||
|
Parameters: []wfv1.Parameter{
|
||||||
|
{
|
||||||
|
Name: "sys-workspace-phase",
|
||||||
|
Value: ptr.String(string(WorkspaceTerminating)),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
When: "{{workflow.parameters.sys-workspace-action}} == delete",
|
||||||
|
},
|
||||||
{
|
{
|
||||||
Name: spec.PostExecutionWorkflow.Entrypoint,
|
Name: spec.PostExecutionWorkflow.Entrypoint,
|
||||||
Template: spec.PostExecutionWorkflow.Entrypoint,
|
Template: spec.PostExecutionWorkflow.Entrypoint,
|
||||||
@@ -335,24 +372,30 @@ metadata:
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
// Add curl template
|
||||||
curlPath := fmt.Sprintf("/apis/v1beta1/{{workflow.namespace}}/workspaces/{{workflow.parameters.sys-uid}}/status")
|
curlPath := fmt.Sprintf("/apis/v1beta1/{{workflow.namespace}}/workspaces/{{workflow.parameters.sys-uid}}/status")
|
||||||
status := map[string]interface{}{
|
status := map[string]interface{}{
|
||||||
"phase": "{{input.parameters.phase}}",
|
"phase": "{{inputs.parameters.sys-workspace-phase}}",
|
||||||
}
|
}
|
||||||
statusBytes, err := json.Marshal(status)
|
statusBytes, err := json.Marshal(status)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
curlNodeTemplate, err := getCURLNodeTemplate("update-workspace-status", curlPath, string(statusBytes))
|
inputs := wfv1.Inputs{
|
||||||
|
Parameters: []wfv1.Parameter{
|
||||||
|
{Name: "sys-workspace-phase"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
curlNodeTemplate, err := getCURLNodeTemplate("sys-update-status", http.MethodPut, curlPath, string(statusBytes), inputs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
templates = append(templates, *curlNodeTemplate)
|
templates = append(templates, *curlNodeTemplate)
|
||||||
|
// Add postExecutionWorkflow if it exists
|
||||||
if spec.PostExecutionWorkflow != nil {
|
if spec.PostExecutionWorkflow != nil {
|
||||||
templates = append(templates, spec.PostExecutionWorkflow.Templates...)
|
templates = append(templates, spec.PostExecutionWorkflow.Templates...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Consider storing this as a Go template in a "settings" database table
|
|
||||||
workflowTemplateSpec := map[string]interface{}{
|
workflowTemplateSpec := map[string]interface{}{
|
||||||
"arguments": spec.Arguments,
|
"arguments": spec.Arguments,
|
||||||
"entrypoint": "workspace",
|
"entrypoint": "workspace",
|
||||||
|
@@ -78,6 +78,9 @@ func IsAuthorized(c *v1.Client, namespace, verb, group, resource, name string) (
|
|||||||
return false, status.Error(codes.PermissionDenied, "Permission denied.")
|
return false, status.Error(codes.PermissionDenied, "Permission denied.")
|
||||||
}
|
}
|
||||||
allowed = review.Status.Allowed
|
allowed = review.Status.Allowed
|
||||||
|
if !allowed {
|
||||||
|
return false, status.Error(codes.PermissionDenied, "Permission denied.")
|
||||||
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@@ -33,7 +33,7 @@ func NewWorkspaceServer() *WorkspaceServer {
|
|||||||
|
|
||||||
func (s *WorkspaceServer) CreateWorkspace(ctx context.Context, req *api.CreateWorkspaceRequest) (*api.Workspace, error) {
|
func (s *WorkspaceServer) CreateWorkspace(ctx context.Context, req *api.CreateWorkspaceRequest) (*api.Workspace, error) {
|
||||||
client := ctx.Value("kubeClient").(*v1.Client)
|
client := ctx.Value("kubeClient").(*v1.Client)
|
||||||
allowed, err := auth.IsAuthorized(client, req.Namespace, "create", "apps/v1", "statefulsets", "")
|
allowed, err := auth.IsAuthorized(client, req.Namespace, "create", "apps", "statefulsets", "")
|
||||||
if err != nil || !allowed {
|
if err != nil || !allowed {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -63,7 +63,7 @@ func (s *WorkspaceServer) CreateWorkspace(ctx context.Context, req *api.CreateWo
|
|||||||
|
|
||||||
func (s *WorkspaceServer) UpdateWorkspaceStatus(ctx context.Context, req *api.UpdateWorkspaceStatusRequest) (*empty.Empty, error) {
|
func (s *WorkspaceServer) UpdateWorkspaceStatus(ctx context.Context, req *api.UpdateWorkspaceStatusRequest) (*empty.Empty, error) {
|
||||||
client := ctx.Value("kubeClient").(*v1.Client)
|
client := ctx.Value("kubeClient").(*v1.Client)
|
||||||
allowed, err := auth.IsAuthorized(client, req.Namespace, "update", "apps/v1", "statefulsets", "")
|
allowed, err := auth.IsAuthorized(client, req.Namespace, "update", "apps", "statefulsets", "")
|
||||||
if err != nil || !allowed {
|
if err != nil || !allowed {
|
||||||
return &empty.Empty{}, err
|
return &empty.Empty{}, err
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user