diff --git a/pkg/cron_workflow.go b/pkg/cron_workflow.go index 91f2aa5..4bb28d8 100644 --- a/pkg/cron_workflow.go +++ b/pkg/cron_workflow.go @@ -387,6 +387,76 @@ func (c *Client) CountCronWorkflows(namespace, workflowTemplateUID string) (coun return } +func (c *Client) buildCronWorkflowDefinition(namespace string, workflowTemplateId *uint64, wf *wfv1.Workflow, cwf *wfv1.CronWorkflow, opts *WorkflowExecutionOptions) (cronWorkflow *wfv1.CronWorkflow, err error) { + if opts == nil { + opts = &WorkflowExecutionOptions{} + } + + if opts.Name != "" { + cwf.ObjectMeta.Name = opts.Name + } + if opts.GenerateName != "" { + cwf.ObjectMeta.GenerateName = opts.GenerateName + } + if opts.Entrypoint != "" { + cwf.Spec.WorkflowSpec.Entrypoint = opts.Entrypoint + } + if opts.ServiceAccount != "" { + cwf.Spec.WorkflowSpec.ServiceAccountName = opts.ServiceAccount + } + if len(opts.Parameters) > 0 { + newParams := make([]wfv1.Parameter, 0) + passedParams := make(map[string]bool) + for _, param := range opts.Parameters { + newParams = append(newParams, wfv1.Parameter{ + Name: param.Name, + Value: param.Value, + }) + passedParams[param.Name] = true + } + + for _, param := range cwf.Spec.WorkflowSpec.Arguments.Parameters { + if _, ok := passedParams[param.Name]; ok { + // this parameter was overridden via command line + continue + } + newParams = append(newParams, param) + } + cwf.Spec.WorkflowSpec.Arguments.Parameters = newParams + wf.Spec.Arguments.Parameters = newParams + } + if opts.Labels != nil { + cwf.ObjectMeta.Labels = *opts.Labels + } + + err = injectExitHandlerWorkflowExecutionStatistic(wf, namespace, workflowTemplateId) + if err != nil { + return nil, err + } + err = injectInitHandlerWorkflowExecutionStatistic(wf, namespace, workflowTemplateId) + if err != nil { + return nil, err + } + if err = c.injectAutomatedFields(namespace, wf, opts); err != nil { + return nil, err + } + + cwf.Spec.WorkflowSpec = wf.Spec + cwf.Spec.WorkflowMetadata = &wf.ObjectMeta + + //merge the labels + mergedLabels := wf.ObjectMeta.Labels + if mergedLabels == nil { + mergedLabels = make(map[string]string) + } + for k, v := range *opts.Labels { + mergedLabels[k] = v + } + cwf.Spec.WorkflowMetadata.Labels = mergedLabels + + return cwf, nil +} + func (c *Client) updateCronWorkflow(namespace string, name string, workflowTemplateId *uint64, wf *wfv1.Workflow, cwf *wfv1.CronWorkflow, opts *WorkflowExecutionOptions) (updatedCronWorkflow *wfv1.CronWorkflow, err error) { //Make sure the CronWorkflow exists before we edit it toUpdateCWF, err := c.ArgoprojV1alpha1().CronWorkflows(namespace).Get(name, metav1.GetOptions{}) @@ -399,71 +469,10 @@ func (c *Client) updateCronWorkflow(namespace string, name string, workflowTempl return nil, util.NewUserError(codes.NotFound, "CronWorkflow not found.") } - if opts == nil { - opts = &WorkflowExecutionOptions{} - } - - if opts.Name != "" { - cwf.ObjectMeta.Name = opts.Name - } - if opts.GenerateName != "" { - cwf.ObjectMeta.GenerateName = opts.GenerateName - } - if opts.Entrypoint != "" { - cwf.Spec.WorkflowSpec.Entrypoint = opts.Entrypoint - } - if opts.ServiceAccount != "" { - cwf.Spec.WorkflowSpec.ServiceAccountName = opts.ServiceAccount - } - if len(opts.Parameters) > 0 { - newParams := make([]wfv1.Parameter, 0) - passedParams := make(map[string]bool) - for _, param := range opts.Parameters { - newParams = append(newParams, wfv1.Parameter{ - Name: param.Name, - Value: param.Value, - }) - passedParams[param.Name] = true - } - - for _, param := range cwf.Spec.WorkflowSpec.Arguments.Parameters { - if _, ok := passedParams[param.Name]; ok { - // this parameter was overridden via command line - continue - } - newParams = append(newParams, param) - } - cwf.Spec.WorkflowSpec.Arguments.Parameters = newParams - wf.Spec.Arguments.Parameters = newParams - } - if opts.Labels != nil { - cwf.ObjectMeta.Labels = *opts.Labels - } - - err = injectExitHandlerWorkflowExecutionStatistic(wf, namespace, workflowTemplateId) + cwf, err = c.buildCronWorkflowDefinition(namespace, workflowTemplateId, wf, cwf, opts) if err != nil { - return nil, err + return } - err = injectInitHandlerWorkflowExecutionStatistic(wf, namespace, workflowTemplateId) - if err != nil { - return nil, err - } - if err = c.injectAutomatedFields(namespace, wf, opts); err != nil { - return nil, err - } - - cwf.Spec.WorkflowSpec = wf.Spec - cwf.Spec.WorkflowMetadata = &wf.ObjectMeta - - //merge the labels - mergedLabels := wf.ObjectMeta.Labels - if mergedLabels == nil { - mergedLabels = make(map[string]string) - } - for k, v := range *opts.Labels { - mergedLabels[k] = v - } - cwf.Spec.WorkflowMetadata.Labels = mergedLabels cwf.Name = name cwf.ResourceVersion = toUpdateCWF.ResourceVersion @@ -476,70 +485,11 @@ func (c *Client) updateCronWorkflow(namespace string, name string, workflowTempl } func (c *Client) createCronWorkflow(namespace string, workflowTemplateId *uint64, wf *wfv1.Workflow, cwf *wfv1.CronWorkflow, opts *WorkflowExecutionOptions) (createdCronWorkflow *wfv1.CronWorkflow, err error) { - if opts == nil { - opts = &WorkflowExecutionOptions{} - } - - if opts.Name != "" { - cwf.ObjectMeta.Name = opts.Name - } - if opts.GenerateName != "" { - cwf.ObjectMeta.GenerateName = opts.GenerateName - } - if opts.Entrypoint != "" { - cwf.Spec.WorkflowSpec.Entrypoint = opts.Entrypoint - } - if opts.ServiceAccount != "" { - cwf.Spec.WorkflowSpec.ServiceAccountName = opts.ServiceAccount - } - if len(opts.Parameters) > 0 { - newParams := make([]wfv1.Parameter, 0) - passedParams := make(map[string]bool) - for _, param := range opts.Parameters { - newParams = append(newParams, wfv1.Parameter{ - Name: param.Name, - Value: param.Value, - }) - passedParams[param.Name] = true - } - - for _, param := range cwf.Spec.WorkflowSpec.Arguments.Parameters { - if _, ok := passedParams[param.Name]; ok { - // this parameter was overridden via command line - continue - } - newParams = append(newParams, param) - } - cwf.Spec.WorkflowSpec.Arguments.Parameters = newParams - wf.Spec.Arguments.Parameters = newParams - } - if opts.Labels != nil { - cwf.ObjectMeta.Labels = *opts.Labels - } - err = injectExitHandlerWorkflowExecutionStatistic(wf, namespace, workflowTemplateId) + cwf, err = c.buildCronWorkflowDefinition(namespace, workflowTemplateId, wf, cwf, opts) if err != nil { - return nil, err - } - err = injectInitHandlerWorkflowExecutionStatistic(wf, namespace, workflowTemplateId) - if err != nil { - return nil, err - } - if err = c.injectAutomatedFields(namespace, wf, opts); err != nil { - return nil, err + return } - cwf.Spec.WorkflowSpec = wf.Spec - cwf.Spec.WorkflowMetadata = &wf.ObjectMeta - - //merge the labels - mergedLabels := wf.ObjectMeta.Labels - if mergedLabels == nil { - mergedLabels = make(map[string]string) - } - for k, v := range *opts.Labels { - mergedLabels[k] = v - } - cwf.Spec.WorkflowMetadata.Labels = mergedLabels createdCronWorkflow, err = c.ArgoprojV1alpha1().CronWorkflows(namespace).Create(cwf) if err != nil { return nil, err diff --git a/pkg/workflow_execution.go b/pkg/workflow_execution.go index bbf4113..5caf31b 100644 --- a/pkg/workflow_execution.go +++ b/pkg/workflow_execution.go @@ -14,6 +14,7 @@ import ( "github.com/onepanelio/core/pkg/util/ptr" "io" "io/ioutil" + "net/http" "regexp" "strconv" "strings" @@ -86,6 +87,45 @@ func UnmarshalWorkflows(wfBytes []byte, strict bool) (wfs []wfv1.Workflow, err e return } +func addSystemUIDParameter(wf *wfv1.Workflow) { + for _, p := range wf.Spec.Arguments.Parameters { + if p.Name == "sys-uid" { + return + } + } + + uid := wf.Labels[label.WorkflowUid] + if uid == "" { + uid = "00000000-0000-0000-0000-000000000000" + } + if wf.Spec.Arguments.Parameters == nil { + wf.Spec.Arguments.Parameters = make([]wfv1.Parameter, 0) + } + wf.Spec.Arguments.Parameters = append(wf.Spec.Arguments.Parameters, wfv1.Parameter{ + Name: "sys-uid", + Value: ptr.String(uid), + }) + + return +} + +func addEnvToTemplate(template *wfv1.Template, key string, value string) { + //Flag to prevent over-writing user's envs + overwriteUserEnv := true + for _, templateEnv := range template.Container.Env { + if templateEnv.Name == key { + overwriteUserEnv = false + break + } + } + if overwriteUserEnv { + template.Container.Env = append(template.Container.Env, corev1.EnvVar{ + Name: key, + Value: value, + }) + } +} + func (c *Client) injectAutomatedFields(namespace string, wf *wfv1.Workflow, opts *WorkflowExecutionOptions) (err error) { if opts.PodGCStrategy == nil { if wf.Spec.PodGC == nil { @@ -102,6 +142,8 @@ func (c *Client) injectAutomatedFields(namespace string, wf *wfv1.Workflow, opts } } + addSystemUIDParameter(wf) + addSecretValsToTemplate := true secret, err := c.GetSecret(namespace, "onepanel-default-env") if err != nil { @@ -181,23 +223,6 @@ func (c *Client) injectAutomatedFields(namespace string, wf *wfv1.Workflow, opts return } -func addEnvToTemplate(template *wfv1.Template, key string, value string) { - //Flag to prevent over-writing user's envs - overwriteUserEnv := true - for _, templateEnv := range template.Container.Env { - if templateEnv.Name == key { - overwriteUserEnv = false - break - } - } - if overwriteUserEnv { - template.Container.Env = append(template.Container.Env, corev1.EnvVar{ - Name: key, - Value: value, - }) - } -} - func (c *Client) createWorkflow(namespace string, workflowTemplateId uint64, workflowTemplateVersionId uint64, wf *wfv1.Workflow, opts *WorkflowExecutionOptions) (newDbId uint64, createdWorkflow *wfv1.Workflow, err error) { if opts == nil { opts = &WorkflowExecutionOptions{} @@ -383,7 +408,7 @@ func (c *Client) CreateWorkflowExecution(namespace string, workflow *WorkflowExe workflow.ID = id workflow.Name = createdWorkflow.Name workflow.CreatedAt = createdWorkflow.CreationTimestamp.UTC() - workflow.UID = string(createdWorkflow.ObjectMeta.UID) + workflow.UID = workflowUid workflow.WorkflowTemplate = workflowTemplate // Manifests could get big, don't return them in this case. workflow.WorkflowTemplate.Manifest = "" @@ -1283,7 +1308,7 @@ func (c *Client) GetWorkflowExecutionStatisticsForTemplates(workflowTemplates .. Will build a template that makes a CURL request to the onepanel-core API, with statistics about the workflow that was just executed. */ -func getCURLNodeTemplate(name, curlPath, curlBody string) (template *wfv1.Template, err error) { +func getCURLNodeTemplate(name, curlMethod, curlPath, curlBody string, inputs wfv1.Inputs) (template *wfv1.Template, err error) { host := env.GetEnv("ONEPANEL_CORE_SERVICE_HOST", "onepanel-core.onepanel.svc.cluster.local") if host == "" { err = errors.New("ONEPANEL_CORE_SERVICE_HOST is empty.") @@ -1296,13 +1321,15 @@ func getCURLNodeTemplate(name, curlPath, curlBody string) (template *wfv1.Templa } endpoint := fmt.Sprintf("http://%s:%s%s", host, port, curlPath) template = &wfv1.Template{ - Name: name, + Name: name, + Inputs: inputs, Container: &corev1.Container{ Name: "curl", Image: "curlimages/curl", Command: []string{"sh", "-c"}, Args: []string{ - "SERVICE_ACCOUNT_TOKEN=$(cat /var/run/secrets/kubernetes.io/serviceaccount/token) && curl -s -o /dev/null -w '%{http_code}' '" + endpoint + "' -H \"Content-Type: application/json\" -H 'Connection: keep-alive' -H 'Accept: application/json' " + + "SERVICE_ACCOUNT_TOKEN=$(cat /var/run/secrets/kubernetes.io/serviceaccount/token) " + + "&& curl -X " + curlMethod + " -s -o /dev/null -w '%{http_code}' '" + endpoint + "' -H \"Content-Type: application/json\" -H 'Connection: keep-alive' -H 'Accept: application/json' " + "-H 'Authorization: Bearer '\"$SERVICE_ACCOUNT_TOKEN\"'' " + "--data '" + curlBody + "' --compressed", }, @@ -1321,7 +1348,7 @@ func injectExitHandlerWorkflowExecutionStatistic(wf *wfv1.Workflow, namespace st if err != nil { return err } - statsTemplate, err := getCURLNodeTemplate("sys-send-exit-stats", curlPath, string(statisticsBytes)) + statsTemplate, err := getCURLNodeTemplate("sys-send-exit-stats", http.MethodPost, curlPath, string(statisticsBytes), wfv1.Inputs{}) if err != nil { return err } @@ -1367,7 +1394,7 @@ func injectInitHandlerWorkflowExecutionStatistic(wf *wfv1.Workflow, namespace st if err != nil { return err } - containerTemplate, err := getCURLNodeTemplate("sys-send-init-stats", curlPath, string(statisticsBytes)) + containerTemplate, err := getCURLNodeTemplate("sys-send-init-stats", http.MethodPost, curlPath, string(statisticsBytes), wfv1.Inputs{}) if err != nil { return err } diff --git a/pkg/workspace_template.go b/pkg/workspace_template.go index da86e27..a6a46eb 100644 --- a/pkg/workspace_template.go +++ b/pkg/workspace_template.go @@ -14,6 +14,7 @@ import ( networking "istio.io/api/networking/v1alpha3" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "net/http" "sigs.k8s.io/yaml" ) @@ -41,12 +42,6 @@ func generateArguments(spec *WorkspaceSpec, config map[string]string) (err error }) // TODO: These can be removed when lint validation of workflows work - // Workspace UID - spec.Arguments.Parameters = append(spec.Arguments.Parameters, Parameter{ - Name: "sys-uid", - Value: ptr.String("00000000-0000-0000-0000-000000000000"), - Type: "input.hidden", - }) // Resource action parameter spec.Arguments.Parameters = append(spec.Arguments.Parameters, Parameter{ Name: "sys-resource-action", @@ -287,6 +282,48 @@ metadata: When: "{{workflow.parameters.sys-workspace-action}} == delete", WithItems: volumeClaimItems, }, + { + Name: "sys-set-phase-running", + Template: "sys-update-status", + Dependencies: []string{"stateful-set"}, + Arguments: wfv1.Arguments{ + Parameters: []wfv1.Parameter{ + { + Name: "sys-workspace-phase", + Value: ptr.String(string(WorkspaceRunning)), + }, + }, + }, + When: "{{workflow.parameters.sys-workspace-action}} == create", + }, + { + Name: "sys-set-phase-pausing", + Template: "sys-update-status", + Dependencies: []string{"delete-stateful-set"}, + Arguments: wfv1.Arguments{ + Parameters: []wfv1.Parameter{ + { + Name: "sys-workspace-phase", + Value: ptr.String(string(WorkspacePausing)), + }, + }, + }, + When: "{{workflow.parameters.sys-workspace-action}} == pause", + }, + { + Name: "sys-set-phase-terminating", + Template: "sys-update-status", + Dependencies: []string{"delete-pvc"}, + Arguments: wfv1.Arguments{ + Parameters: []wfv1.Parameter{ + { + Name: "sys-workspace-phase", + Value: ptr.String(string(WorkspaceTerminating)), + }, + }, + }, + When: "{{workflow.parameters.sys-workspace-action}} == delete", + }, { Name: spec.PostExecutionWorkflow.Entrypoint, Template: spec.PostExecutionWorkflow.Entrypoint, @@ -335,24 +372,30 @@ metadata: }, }, } + // Add curl template curlPath := fmt.Sprintf("/apis/v1beta1/{{workflow.namespace}}/workspaces/{{workflow.parameters.sys-uid}}/status") status := map[string]interface{}{ - "phase": "{{input.parameters.phase}}", + "phase": "{{inputs.parameters.sys-workspace-phase}}", } statusBytes, err := json.Marshal(status) if err != nil { return } - curlNodeTemplate, err := getCURLNodeTemplate("update-workspace-status", curlPath, string(statusBytes)) + inputs := wfv1.Inputs{ + Parameters: []wfv1.Parameter{ + {Name: "sys-workspace-phase"}, + }, + } + curlNodeTemplate, err := getCURLNodeTemplate("sys-update-status", http.MethodPut, curlPath, string(statusBytes), inputs) if err != nil { return } templates = append(templates, *curlNodeTemplate) + // Add postExecutionWorkflow if it exists if spec.PostExecutionWorkflow != nil { templates = append(templates, spec.PostExecutionWorkflow.Templates...) } - // TODO: Consider storing this as a Go template in a "settings" database table workflowTemplateSpec := map[string]interface{}{ "arguments": spec.Arguments, "entrypoint": "workspace", diff --git a/server/auth/auth.go b/server/auth/auth.go index e7e482b..cce863c 100644 --- a/server/auth/auth.go +++ b/server/auth/auth.go @@ -78,6 +78,9 @@ func IsAuthorized(c *v1.Client, namespace, verb, group, resource, name string) ( return false, status.Error(codes.PermissionDenied, "Permission denied.") } allowed = review.Status.Allowed + if !allowed { + return false, status.Error(codes.PermissionDenied, "Permission denied.") + } return } diff --git a/server/workspace_server.go b/server/workspace_server.go index b7c49ec..72a17e5 100644 --- a/server/workspace_server.go +++ b/server/workspace_server.go @@ -33,7 +33,7 @@ func NewWorkspaceServer() *WorkspaceServer { func (s *WorkspaceServer) CreateWorkspace(ctx context.Context, req *api.CreateWorkspaceRequest) (*api.Workspace, error) { client := ctx.Value("kubeClient").(*v1.Client) - allowed, err := auth.IsAuthorized(client, req.Namespace, "create", "apps/v1", "statefulsets", "") + allowed, err := auth.IsAuthorized(client, req.Namespace, "create", "apps", "statefulsets", "") if err != nil || !allowed { return nil, err } @@ -63,7 +63,7 @@ func (s *WorkspaceServer) CreateWorkspace(ctx context.Context, req *api.CreateWo func (s *WorkspaceServer) UpdateWorkspaceStatus(ctx context.Context, req *api.UpdateWorkspaceStatusRequest) (*empty.Empty, error) { client := ctx.Value("kubeClient").(*v1.Client) - allowed, err := auth.IsAuthorized(client, req.Namespace, "update", "apps/v1", "statefulsets", "") + allowed, err := auth.IsAuthorized(client, req.Namespace, "update", "apps", "statefulsets", "") if err != nil || !allowed { return &empty.Empty{}, err }