Files
onepanel/pkg/workspace_template.go
Andrey Melnikov ebd304b7e1 Merge pull request #883 from Vafilor/feat/filtering.updates
feat: added methods to get just the field name for resources
2021-02-26 09:49:10 -08:00

1372 lines
42 KiB
Go

package v1
import (
"database/sql"
"encoding/json"
goerrors "errors"
"fmt"
sq "github.com/Masterminds/squirrel"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/asaskevich/govalidator"
"github.com/onepanelio/core/pkg/util"
"github.com/onepanelio/core/pkg/util/ptr"
"github.com/onepanelio/core/pkg/util/request"
"github.com/onepanelio/core/pkg/util/router"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
networking "istio.io/api/networking/v1alpha3"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"net/http"
"sigs.k8s.io/yaml"
"strings"
)
// WorkspaceDAGTemplateCreateStatefulSet is used inside the DAG creation for the workspace.
const WorkspaceDAGTemplateCreateStatefulSet = "create-stateful-set"
// WorkspaceDAGTemplateGetStatefulSet is used inside the DAG creation for the workspace.
const WorkspaceDAGTemplateGetStatefulSet = "get-stateful-set"
// WorkspaceDAGTemplateService is used inside the DAG creation for the workspace.
const WorkspaceDAGTemplateService = "service"
// WorkspaceDAGTemplateVirtualService is used inside the DAG creation for the workspace.
const WorkspaceDAGTemplateVirtualService = "virtual-service"
// WorkspaceDAGTemplateCreateWorkspace is used inside the DAG creation for the workspace.
const WorkspaceDAGTemplateCreateWorkspace = "create-workspace"
// WorkspaceDAGTemplateDeleteStatefulSet is used inside the DAG creation for the workspace.
const WorkspaceDAGTemplateDeleteStatefulSet = "delete-stateful-set"
// WorkspaceDAGTemplateDeleteWorkspace is used inside the DAG creation for the workspace.
const WorkspaceDAGTemplateDeleteWorkspace = "delete-workspace"
// WorkspaceDAGTemplateDeletePVC is used inside the DAG creation for the workspace.
const WorkspaceDAGTemplateDeletePVC = "delete-pvc"
// WorkspaceDAGTemplateSysSetPhaseRunning is used inside the DAG creation for the workspace.
const WorkspaceDAGTemplateSysSetPhaseRunning = "sys-set-phase-running"
// WorkspaceDAGTemplateSysSetPhasePaused is used inside the DAG creation for the workspace.
const WorkspaceDAGTemplateSysSetPhasePaused = "sys-set-phase-paused"
// WorkspaceDAGTemplateSysSetPhaseTerminated is used inside the DAG creation for the workspace.
const WorkspaceDAGTemplateSysSetPhaseTerminated = "sys-set-phase-terminated"
// WorkspaceServiceResource is resource used in workspace template creation.
const WorkspaceServiceResource = "service-resource"
// WorkspaceVirtualServiceResource is resource used in workspace template creation.
const WorkspaceVirtualServiceResource = "virtual-service-resource"
// WorkspaceStatefulSetResource is resource used in workspace template creation.
const WorkspaceStatefulSetResource = "stateful-set-resource"
// WorkspaceGetStatefulSetResource is resource used in workspace template creation.
const WorkspaceGetStatefulSetResource = "get-stateful-set-resource"
// WorkspaceDeleteStatefulSetResource is resource used in workspace template creation.
const WorkspaceDeleteStatefulSetResource = "delete-stateful-set-resource"
// WorkspaceResource is resource used in workspace template creation.
const WorkspaceResource = "workspace-resource"
// WorkspaceDeletePVCResource is resource used in workspace template creation.
const WorkspaceDeletePVCResource = "delete-pvc-resource"
// WorkspaceTemplateFilter represents the available ways we can filter WorkspaceTemplates
type WorkspaceTemplateFilter struct {
Labels []*Label
UID string // empty string means none
}
// GetLabels gets the labels of the filter
func (wt *WorkspaceTemplateFilter) GetLabels() []*Label {
return wt.Labels
}
func applyWorkspaceTemplateFilter(sb sq.SelectBuilder, request *request.Request) (sq.SelectBuilder, error) {
if !request.HasFilter() {
return sb, nil
}
filter, ok := request.Filter.(WorkspaceTemplateFilter)
if !ok {
return sb, nil
}
if filter.UID != "" {
sb = sb.Where(sq.Eq{
"uid": filter.UID,
})
}
sb, err := ApplyLabelSelectQuery("wt.labels", sb, &filter)
if err != nil {
return sb, err
}
return sb, nil
}
// createWorkspaceTemplateVersionDB creates a workspace template version in the database.
func createWorkspaceTemplateVersionDB(tx sq.BaseRunner, template *WorkspaceTemplate) (err error) {
err = sb.Insert("workspace_template_versions").
SetMap(sq.Eq{
"version": template.Version,
"is_latest": template.IsLatest,
"manifest": template.Manifest,
"workspace_template_id": template.ID,
"labels": template.Labels,
}).
Suffix("RETURNING id").
RunWith(tx).
QueryRow().
Scan(&template.ID)
return
}
// markWorkspaceTemplateVersionsOutdatedDB updates all of the workspace template versions in db so is_latest is false
// given the workspaceTemplateID
func markWorkspaceTemplateVersionsOutdatedDB(tx sq.BaseRunner, workspaceTemplateID uint64) (err error) {
_, err = sb.Update("workspace_template_versions").
SetMap(sq.Eq{"is_latest": false}).
Where(sq.Eq{
"workspace_template_id": workspaceTemplateID,
"is_latest": true,
}).
RunWith(tx).
Exec()
return
}
// createLatestWorkspaceTemplateVersionDB creates a new workspace template version and marks all previous versions as not latest.
func createLatestWorkspaceTemplateVersionDB(tx sq.BaseRunner, template *WorkspaceTemplate) (err error) {
if template == nil {
return fmt.Errorf("workspaceTemplate is nil")
}
if template.ID < 1 {
return fmt.Errorf("workspaceTemplate.ID is not set")
}
err = markWorkspaceTemplateVersionsOutdatedDB(tx, template.ID)
if err != nil {
return
}
template.IsLatest = true
return createWorkspaceTemplateVersionDB(tx, template)
}
func parseWorkspaceSpec(template string) (spec *WorkspaceSpec, err error) {
err = yaml.UnmarshalStrict([]byte(template), &spec)
return
}
func generateRuntimeParameters(config SystemConfig) (parameters []Parameter, err error) {
parameters = make([]Parameter, 0)
// Host
parameters = append(parameters, Parameter{
Name: "sys-host",
Value: config.Domain(),
Type: "input.hidden",
})
// Node pool parameter and options
options, err := config.NodePoolOptionsAsParameters()
if err != nil {
return nil, err
}
if len(options) == 0 {
return nil, fmt.Errorf("no node pool options in config")
}
parameters = append(parameters, Parameter{
Name: "sys-node-pool-label",
Value: config.NodePoolLabel(),
Type: "input.hidden",
})
parameters = append(parameters, Parameter{
Name: "sys-node-pool",
Value: ptr.String(options[0].Value),
Type: "select.nodepool",
Options: options,
DisplayName: ptr.String("Node pool"),
Hint: ptr.String("Name of node pool or group"),
Required: true,
})
return
}
func generateArguments(spec *WorkspaceSpec, config SystemConfig) (err error) {
systemParameters := make([]Parameter, 0)
// Resource action parameter
systemParameters = append(systemParameters, Parameter{
Name: "sys-name",
Type: "input.text",
Value: ptr.String("name"),
DisplayName: ptr.String("Workspace name"),
Hint: ptr.String("Must be between 3-30 characters, contain only alphanumeric or `-` characters"),
Required: true,
})
// TODO: These can be removed when lint validation of workflows work
// Resource action parameter
systemParameters = append(systemParameters, Parameter{
Name: "sys-resource-action",
Value: ptr.String("apply"),
Type: "input.hidden",
})
// Workspace action
systemParameters = append(systemParameters, Parameter{
Name: "sys-workspace-action",
Value: ptr.String("create"),
Type: "input.hidden",
})
// UID placeholder
systemParameters = append(systemParameters, Parameter{
Name: "sys-uid",
Value: ptr.String("uid"),
Type: "input.hidden",
})
runtimeParameters, err := generateRuntimeParameters(config)
if err != nil {
return
}
systemParameters = append(systemParameters, runtimeParameters...)
if spec.Arguments == nil {
spec.Arguments = &Arguments{
Parameters: []Parameter{},
}
}
spec.Arguments.Parameters = append(systemParameters, spec.Arguments.Parameters...)
systemVolumeParameters := make([]Parameter, 0)
// Map all the volumeClaimTemplates that have storage set
volumeStorageQuantityIsSet := make(map[string]bool)
for _, v := range spec.VolumeClaimTemplates {
if v.Spec.Resources.Requests != nil {
volumeStorageQuantityIsSet[v.ObjectMeta.Name] = true
}
}
// Volume size parameters
volumeClaimsMapped := make(map[string]bool)
// Don't generate parameters for system volumes volumeMounts
volumeClaimsMapped["sys-dshm"] = true
volumeClaimsMapped["sys-namespace-config"] = true
for _, c := range spec.Containers {
for _, v := range c.VolumeMounts {
// Skip if already mapped or storage size is set
if volumeClaimsMapped[v.Name] || volumeStorageQuantityIsSet[v.Name] {
continue
}
systemVolumeParameters = append(systemVolumeParameters, Parameter{
Name: fmt.Sprintf("sys-%v-volume-size", v.Name),
Type: "input.number",
Value: ptr.String("20480"),
DisplayName: ptr.String(fmt.Sprintf("Disk size for \"%v\"", v.Name)),
Hint: ptr.String(fmt.Sprintf("Disk size in MB for volume mounted at `%v`", v.MountPath)),
Required: true,
})
volumeClaimsMapped[v.Name] = true
}
}
spec.Arguments.Parameters = append(spec.Arguments.Parameters, systemVolumeParameters...)
return
}
func createServiceManifest(spec *WorkspaceSpec) (serviceManifest string, err error) {
service := corev1.Service{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Service",
},
ObjectMeta: metav1.ObjectMeta{
Name: "{{workflow.parameters.sys-uid}}",
},
Spec: corev1.ServiceSpec{
Ports: spec.Ports,
Selector: map[string]string{
"app": "{{workflow.parameters.sys-uid}}",
},
},
}
serviceManifestBytes, err := yaml.Marshal(service)
if err != nil {
return
}
serviceManifest = string(serviceManifestBytes)
return
}
func createVirtualServiceManifest(spec *WorkspaceSpec) (virtualServiceManifest string, err error) {
for _, h := range spec.Routes {
for _, r := range h.Route {
r.Destination.Host = "{{workflow.parameters.sys-uid}}"
}
}
virtualService := map[string]interface{}{
"apiVersion": "networking.istio.io/v1alpha3",
"kind": "VirtualService",
"metadata": metav1.ObjectMeta{
Name: "{{workflow.parameters.sys-uid}}",
},
"spec": networking.VirtualService{
Http: spec.Routes,
Gateways: []string{"istio-system/ingressgateway"},
Hosts: []string{"{{workflow.parameters.sys-host}}"},
},
}
virtualServiceManifestBytes, err := yaml.Marshal(virtualService)
if err != nil {
return
}
virtualServiceManifest = string(virtualServiceManifestBytes)
return
}
func createStatefulSetManifest(spec *WorkspaceSpec) (statefulSetManifest string, err error) {
var volumeClaims []map[string]interface{}
volumeClaimsMapped := make(map[string]bool)
// Add volumeClaims that the user has added first
for _, v := range spec.VolumeClaimTemplates {
if volumeClaimsMapped[v.ObjectMeta.Name] {
continue
}
// Use the `onepanel` storage class instead of default
if v.Spec.StorageClassName == nil {
v.Spec.StorageClassName = ptr.String("onepanel")
}
// Check if storage is set or if it needs to be dynamic
var storage interface{} = fmt.Sprintf("{{workflow.parameters.sys-%v-volume-size}}Mi", v.Name)
if v.Spec.Resources.Requests != nil {
storage = v.Spec.Resources.Requests["storage"]
}
volumeClaims = append(volumeClaims, map[string]interface{}{
"metadata": metav1.ObjectMeta{
Name: v.ObjectMeta.Name,
},
"spec": map[string]interface{}{
"accessModes": v.Spec.AccessModes,
"storageClassName": v.Spec.StorageClassName,
"resources": map[string]interface{}{
"requests": map[string]interface{}{
"storage": storage,
},
},
},
})
volumeClaimsMapped[v.ObjectMeta.Name] = true
}
// Don't generate a volumeClaimTemplate for system volumes volumeMounts
volumeClaimsMapped["sys-dshm"] = true
volumeClaimsMapped["sys-namespace-config"] = true
// Automatically map the remaining ones
for i, c := range spec.Containers {
container := &spec.Containers[i]
for _, v := range c.VolumeMounts {
if volumeClaimsMapped[v.Name] {
continue
}
volumeClaims = append(volumeClaims, map[string]interface{}{
"metadata": metav1.ObjectMeta{
Name: v.Name,
},
"spec": map[string]interface{}{
"accessModes": []corev1.PersistentVolumeAccessMode{
"ReadWriteOnce",
},
"storageClassName": ptr.String("onepanel"),
"resources": map[string]interface{}{
"requests": map[string]string{
"storage": fmt.Sprintf("{{workflow.parameters.sys-%v-volume-size}}Mi", v.Name),
},
},
},
})
volumeClaimsMapped[v.Name] = true
}
container.VolumeMounts = append(container.VolumeMounts, corev1.VolumeMount{
Name: "sys-dshm",
MountPath: "/dev/shm",
})
}
template := corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "{{workflow.parameters.sys-uid}}",
"onepanel.io/entity-type": "Workspace",
"onepanel.io/entity-uid": "{{workflow.parameters.sys-uid}}",
},
},
Spec: corev1.PodSpec{
NodeSelector: map[string]string{
"{{workflow.parameters.sys-node-pool-label}}": "{{workflow.parameters.sys-node-pool}}",
},
ImagePullSecrets: spec.ImagePullSecrets,
Containers: spec.Containers,
Volumes: []corev1.Volume{
{
Name: "sys-dshm",
VolumeSource: corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{
Medium: corev1.StorageMediumMemory,
},
},
},
{
Name: "sys-namespace-config",
VolumeSource: corev1.VolumeSource{
Projected: &corev1.ProjectedVolumeSource{
Sources: []corev1.VolumeProjection{
{
ConfigMap: &corev1.ConfigMapProjection{
LocalObjectReference: corev1.LocalObjectReference{
Name: "onepanel",
},
},
},
{
Secret: &corev1.SecretProjection{
LocalObjectReference: corev1.LocalObjectReference{
Name: "onepanel",
},
},
},
},
},
},
},
},
},
}
statefulSet := map[string]interface{}{
"apiVersion": "apps/v1",
"kind": "StatefulSet",
"metadata": metav1.ObjectMeta{
Name: "{{workflow.parameters.sys-uid}}",
},
"spec": map[string]interface{}{
"replicas": 1,
"serviceName": "{{workflow.parameters.sys-uid}}",
"selector": &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": "{{workflow.parameters.sys-uid}}",
},
},
"template": template,
"volumeClaimTemplates": volumeClaims,
},
}
statefulSetManifestBytes, err := yaml.Marshal(statefulSet)
if err != nil {
return
}
statefulSetManifest = string(statefulSetManifestBytes)
return
}
func createWorkspaceManifest(spec *WorkspaceSpec) (workspaceManifest string, err error) {
// TODO: This needs to be a Kubernetes Go struct
// TODO: labels should be persisted here as well
workspace := map[string]interface{}{
"apiVersion": "onepanel.io/v1alpha1",
"kind": "Workspace",
"metadata": metav1.ObjectMeta{
Name: "{{workflow.parameters.sys-uid}}",
},
}
workspaceManifestBytes, err := yaml.Marshal(workspace)
if err != nil {
return
}
workspaceManifest = string(workspaceManifestBytes)
return
}
func unmarshalWorkflowTemplate(spec *WorkspaceSpec, serviceManifest, virtualServiceManifest, statefulSetManifest, workspaceManifest string) (workflowTemplateSpecManifest string, err error) {
var volumeClaimItems []wfv1.Item
volumeClaimsMapped := make(map[string]bool)
for _, c := range spec.Containers {
for _, v := range c.VolumeMounts {
if volumeClaimsMapped[v.Name] {
continue
}
item, err := wfv1.ParseItem(fmt.Sprintf(`"%v"`, v.Name))
if err != nil {
return "", err
}
volumeClaimItems = append(volumeClaimItems, item)
volumeClaimsMapped[v.Name] = true
}
}
getStatefulSetManifest := `apiVersion: apps/v1
kind: StatefulSet
metadata:
name: {{workflow.parameters.sys-uid}}
`
deletePVCManifest := `apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: {{inputs.parameters.sys-pvc-name}}-{{workflow.parameters.sys-uid}}-0
`
templates := []wfv1.Template{
{
Name: "handleExit",
DAG: &wfv1.DAGTemplate{
Tasks: []wfv1.DAGTask{
{
Name: "exit-handler",
Template: "sys-update-workspace-status",
},
},
},
},
{
Name: "workspace",
DAG: &wfv1.DAGTemplate{
FailFast: ptr.Bool(false),
Tasks: []wfv1.DAGTask{
{
Name: WorkspaceDAGTemplateService,
Template: WorkspaceServiceResource,
},
{
Name: WorkspaceDAGTemplateVirtualService,
Template: WorkspaceVirtualServiceResource,
Dependencies: []string{WorkspaceDAGTemplateService},
},
{
Name: WorkspaceDAGTemplateCreateStatefulSet,
Template: WorkspaceStatefulSetResource,
Dependencies: []string{WorkspaceDAGTemplateVirtualService},
When: "{{workflow.parameters.sys-workspace-action}} == create || {{workflow.parameters.sys-workspace-action}} == update",
},
{
Name: WorkspaceDAGTemplateGetStatefulSet,
Template: WorkspaceGetStatefulSetResource,
Dependencies: []string{WorkspaceDAGTemplateCreateStatefulSet},
When: "{{workflow.parameters.sys-workspace-action}} == create || {{workflow.parameters.sys-workspace-action}} == update",
Arguments: wfv1.Arguments{
Parameters: []wfv1.Parameter{
{
Name: "update-revision",
Value: wfv1.AnyStringPtr("{{tasks.create-stateful-set.outputs.parameters.update-revision}}"),
},
},
},
},
{
Name: WorkspaceDAGTemplateCreateWorkspace,
Template: WorkspaceResource,
Dependencies: []string{WorkspaceDAGTemplateGetStatefulSet},
When: "{{workflow.parameters.sys-workspace-action}} == create || {{workflow.parameters.sys-workspace-action}} == update",
},
{
Name: WorkspaceDAGTemplateDeleteStatefulSet,
Template: WorkspaceDeleteStatefulSetResource,
Dependencies: []string{WorkspaceDAGTemplateVirtualService},
When: "{{workflow.parameters.sys-workspace-action}} == pause || {{workflow.parameters.sys-workspace-action}} == delete",
},
{
Name: WorkspaceDAGTemplateDeleteWorkspace,
Template: WorkspaceResource,
Dependencies: []string{WorkspaceDAGTemplateDeleteStatefulSet},
When: "{{workflow.parameters.sys-workspace-action}} == pause || {{workflow.parameters.sys-workspace-action}} == delete",
},
{
Name: WorkspaceDAGTemplateDeletePVC,
Template: WorkspaceDeletePVCResource,
Dependencies: []string{WorkspaceDAGTemplateDeleteWorkspace},
Arguments: wfv1.Arguments{
Parameters: []wfv1.Parameter{
{
Name: "sys-pvc-name",
Value: wfv1.AnyStringPtr("{{item}}"),
},
},
},
When: "{{workflow.parameters.sys-workspace-action}} == delete",
WithItems: volumeClaimItems,
},
{
Name: WorkspaceDAGTemplateSysSetPhaseRunning,
Template: "sys-update-status",
Dependencies: []string{WorkspaceDAGTemplateCreateWorkspace},
Arguments: wfv1.Arguments{
Parameters: []wfv1.Parameter{
{
Name: "sys-workspace-phase",
Value: wfv1.AnyStringPtr(WorkspaceRunning),
},
},
},
When: "{{workflow.parameters.sys-workspace-action}} == create || {{workflow.parameters.sys-workspace-action}} == update",
},
{
Name: WorkspaceDAGTemplateSysSetPhasePaused,
Template: "sys-update-status",
Dependencies: []string{WorkspaceDAGTemplateDeleteWorkspace},
Arguments: wfv1.Arguments{
Parameters: []wfv1.Parameter{
{
Name: "sys-workspace-phase",
Value: wfv1.AnyStringPtr(WorkspacePaused),
},
},
},
When: "{{workflow.parameters.sys-workspace-action}} == pause",
},
{
Name: WorkspaceDAGTemplateSysSetPhaseTerminated,
Template: "sys-update-status",
Dependencies: []string{WorkspaceDAGTemplateDeletePVC},
Arguments: wfv1.Arguments{
Parameters: []wfv1.Parameter{
{
Name: "sys-workspace-phase",
Value: wfv1.AnyStringPtr(WorkspaceTerminated),
},
},
},
When: "{{workflow.parameters.sys-workspace-action}} == delete",
},
},
},
},
{
Name: WorkspaceServiceResource,
Resource: &wfv1.ResourceTemplate{
Action: "{{workflow.parameters.sys-resource-action}}",
Manifest: serviceManifest,
},
},
{
Name: WorkspaceVirtualServiceResource,
Resource: &wfv1.ResourceTemplate{
Action: "{{workflow.parameters.sys-resource-action}}",
Manifest: virtualServiceManifest,
},
},
{
Name: WorkspaceStatefulSetResource,
Resource: &wfv1.ResourceTemplate{
Action: "{{workflow.parameters.sys-resource-action}}",
Manifest: statefulSetManifest,
SuccessCondition: "status.readyReplicas > 0",
},
Outputs: wfv1.Outputs{
Parameters: []wfv1.Parameter{
{
Name: "update-revision",
ValueFrom: &wfv1.ValueFrom{
JSONPath: "{.status.updateRevision}",
},
},
},
},
},
{
Name: WorkspaceGetStatefulSetResource,
Inputs: wfv1.Inputs{
Parameters: []wfv1.Parameter{{Name: "update-revision"}},
},
Resource: &wfv1.ResourceTemplate{
Action: "get",
Manifest: getStatefulSetManifest,
SuccessCondition: "status.readyReplicas > 0, status.currentRevision == {{inputs.parameters.update-revision}}",
},
},
{
Name: WorkspaceDeleteStatefulSetResource,
Resource: &wfv1.ResourceTemplate{
Action: "{{workflow.parameters.sys-resource-action}}",
Manifest: statefulSetManifest,
},
},
{
Name: WorkspaceResource,
Resource: &wfv1.ResourceTemplate{
Action: "{{workflow.parameters.sys-resource-action}}",
Manifest: workspaceManifest,
},
},
{
Name: WorkspaceDeletePVCResource,
Inputs: wfv1.Inputs{
Parameters: []wfv1.Parameter{{Name: "sys-pvc-name"}},
},
Resource: &wfv1.ResourceTemplate{
Action: "{{workflow.parameters.sys-resource-action}}",
Manifest: deletePVCManifest,
},
},
}
webRouter, err := router.NewRelativeAPIRouter()
if err != nil {
return "", err
}
curlUpdateWorkspaceStatusPath := webRouter.UpdateWorkspaceStatus("{{workflow.namespace}}", "{{workflow.parameters.sys-uid}}")
status := map[string]interface{}{
"phase": "{{workflow.status}}",
}
statusBytes, err := json.Marshal(status)
if err != nil {
return
}
inputs := wfv1.Inputs{}
curlUpdateWorkspaceNodeTemplate, err := getCURLNodeTemplate("sys-update-workspace-status", http.MethodPut, curlUpdateWorkspaceStatusPath, string(statusBytes), inputs)
if err != nil {
return
}
templates = append(templates, *curlUpdateWorkspaceNodeTemplate)
// Add curl template
curlPath := fmt.Sprintf("/apis/v1beta1/{{workflow.namespace}}/workspaces/{{workflow.parameters.sys-uid}}/status")
status = map[string]interface{}{
"phase": "{{inputs.parameters.sys-workspace-phase}}",
}
statusBytes, err = json.Marshal(status)
if err != nil {
return
}
inputs = wfv1.Inputs{
Parameters: []wfv1.Parameter{
{Name: "sys-workspace-phase"},
},
}
curlNodeTemplate, err := getCURLNodeTemplate("sys-update-status", http.MethodPut, curlPath, string(statusBytes), inputs)
if err != nil {
return
}
templates = append(templates, *curlNodeTemplate)
// Add postExecutionWorkflow if it exists
if spec.PostExecutionWorkflow != nil {
dag := wfv1.DAGTask{
Name: spec.PostExecutionWorkflow.Entrypoint,
Template: spec.PostExecutionWorkflow.Entrypoint,
Dependencies: []string{WorkspaceDAGTemplateSysSetPhaseRunning, WorkspaceDAGTemplateSysSetPhasePaused, WorkspaceDAGTemplateSysSetPhaseTerminated},
}
templates[1].DAG.Tasks = append(templates[1].DAG.Tasks, dag)
templates = append(templates, spec.PostExecutionWorkflow.Templates...)
}
workflowTemplateSpec := map[string]interface{}{
"arguments": spec.Arguments,
"entrypoint": "workspace",
"onExit": "handleExit",
"templates": templates,
"imagePullSecrets": spec.ImagePullSecrets,
}
workflowTemplateSpecManifestBytes, err := yaml.Marshal(workflowTemplateSpec)
if err != nil {
return
}
workflowTemplateSpecManifest = string(workflowTemplateSpecManifestBytes)
return
}
func (c *Client) createWorkspaceTemplate(namespace string, workspaceTemplate *WorkspaceTemplate) (*WorkspaceTemplate, error) {
err := workspaceTemplate.GenerateUID(workspaceTemplate.Name)
if err != nil {
return nil, err
}
workspaceTemplate.WorkflowTemplate.IsSystem = true
workspaceTemplate.WorkflowTemplate.Resource = ptr.String(TypeWorkspaceTemplate)
workspaceTemplate.WorkflowTemplate.ResourceUID = &workspaceTemplate.UID
// validate workflow template
if err := c.validateWorkflowTemplate(namespace, workspaceTemplate.WorkflowTemplate); err != nil {
message := strings.Replace(err.Error(), "{{workflow.", "{{workspace.", -1)
return nil, util.NewUserError(codes.InvalidArgument, message)
}
workspaceTemplate.WorkflowTemplate, _, err = c.createWorkflowTemplate(namespace, workspaceTemplate.WorkflowTemplate)
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"WorkspaceTemplate": workspaceTemplate,
"Error": err.Error(),
}).Error("Could not create workflow template for workspace.")
return nil, util.NewUserErrorWrap(err, "Workflow template")
}
workspaceTemplate.Version = workspaceTemplate.WorkflowTemplate.Version
workspaceTemplate.IsLatest = true
tx, err := c.DB.Begin()
if err != nil {
return nil, err
}
defer tx.Rollback()
err = sb.Insert("workspace_templates").
SetMap(sq.Eq{
"uid": workspaceTemplate.UID,
"name": workspaceTemplate.Name,
"description": workspaceTemplate.Description,
"namespace": namespace,
"workflow_template_id": workspaceTemplate.WorkflowTemplate.ID,
"labels": workspaceTemplate.Labels,
}).
Suffix("RETURNING id, created_at").
RunWith(tx).
QueryRow().
Scan(&workspaceTemplate.ID, &workspaceTemplate.CreatedAt)
if err != nil {
_, errCleanUp := c.ArchiveWorkflowTemplate(namespace, workspaceTemplate.WorkflowTemplate.UID)
errorMsg := "Error with insert into workspace_templates. "
if errCleanUp != nil {
errorMsg += "Error with clean-up: ArchiveWorkflowTemplate. "
errorMsg += errCleanUp.Error()
}
return nil, util.NewUserErrorWrap(err, errorMsg) //return the source error
}
err = createWorkspaceTemplateVersionDB(tx, workspaceTemplate)
if err != nil {
errorMsg := "Error with insert into workspace_templates_versions. "
_, errCleanUp := c.ArchiveWorkflowTemplate(namespace, workspaceTemplate.WorkflowTemplate.UID)
if errCleanUp != nil {
err = fmt.Errorf("%w; %s", err, errCleanUp)
errorMsg += "Error with clean-up: ArchiveWorkflowTemplate. "
}
return nil, util.NewUserErrorWrap(err, errorMsg) // return the source error
}
if err = tx.Commit(); err != nil {
_, errArchive := c.ArchiveWorkflowTemplate(namespace, workspaceTemplate.WorkflowTemplate.UID)
if errArchive != nil {
err = fmt.Errorf("%w; %s", err, errArchive)
}
return nil, err
}
return workspaceTemplate, nil
}
func (c *Client) workspaceTemplatesSelectBuilder(namespace string) sq.SelectBuilder {
sb := sb.Select(getWorkspaceTemplateColumns("wt")...).
From("workspace_templates wt").
Where(sq.Eq{
"wt.namespace": namespace,
})
return sb
}
func (c *Client) workspaceTemplateVersionsSelectBuilder(namespace, uid string) sq.SelectBuilder {
sb := sb.Select(getWorkspaceTemplateColumnsWithoutLabels("wt")...).
From("workspace_templates wt").
Columns("wtv.id \"workspace_template_version_id\"", "wtv.created_at \"created_at\"", "wtv.version", "wtv.manifest", "wtv.labels", "wft.id \"workflow_template.id\"", "wft.uid \"workflow_template.uid\"", "wftv.version \"workflow_template.version\"", "wftv.manifest \"workflow_template.manifest\"").
Join("workspace_template_versions wtv ON wtv.workspace_template_id = wt.id").
Join("workflow_templates wft ON wft.id = wt.workflow_template_id").
Join("workflow_template_versions wftv ON wftv.workflow_template_id = wft.id").
Where(sq.Eq{
"wt.uid": uid,
"wt.namespace": namespace,
})
return sb
}
func (c *Client) getWorkspaceTemplateByName(namespace, name string) (workspaceTemplate *WorkspaceTemplate, err error) {
workspaceTemplate = &WorkspaceTemplate{}
sb := c.workspaceTemplatesSelectBuilder(namespace).
Where(sq.Eq{
"wt.name": name,
"is_archived": false,
}).
Limit(1)
query, args, err := sb.ToSql()
if err != nil {
return
}
if err = c.DB.Get(workspaceTemplate, query, args...); err == sql.ErrNoRows {
err = nil
workspaceTemplate = nil
}
return
}
func (c *Client) generateWorkspaceTemplateWorkflowTemplate(workspaceTemplate *WorkspaceTemplate) (workflowTemplate *WorkflowTemplate, err error) {
if workspaceTemplate == nil || workspaceTemplate.Manifest == "" {
return nil, util.NewUserError(codes.InvalidArgument, "Workspace template manifest is required")
}
config, err := c.GetSystemConfig()
if err != nil {
return nil, err
}
workspaceSpec, err := parseWorkspaceSpec(workspaceTemplate.Manifest)
if err != nil {
return nil, util.NewUserError(codes.InvalidArgument, err.Error())
}
if workspaceSpec.Arguments != nil {
modifiedParameters, err := c.replaceSysNodePoolOptions(workspaceSpec.Arguments.Parameters)
if err != nil {
return nil, err
}
workspaceSpec.Arguments.Parameters = modifiedParameters
}
if err = generateArguments(workspaceSpec, config); err != nil {
return nil, err
}
serviceManifest, err := createServiceManifest(workspaceSpec)
if err != nil {
return nil, err
}
virtualServiceManifest, err := createVirtualServiceManifest(workspaceSpec)
if err != nil {
return nil, err
}
statefulSetManifest, err := createStatefulSetManifest(workspaceSpec)
if err != nil {
return nil, err
}
workspaceManifest, err := createWorkspaceManifest(workspaceSpec)
if err != nil {
return nil, err
}
workflowTemplateManifest, err := unmarshalWorkflowTemplate(workspaceSpec, serviceManifest, virtualServiceManifest, statefulSetManifest, workspaceManifest)
if err != nil {
return nil, err
}
workflowTemplateManifest = strings.NewReplacer(
"{{workspace.parameters.", "{{workflow.parameters.").Replace(workflowTemplateManifest)
workflowTemplate = &WorkflowTemplate{
Name: workspaceTemplate.Name,
Manifest: workflowTemplateManifest,
}
return workflowTemplate, nil
}
// GenerateWorkspaceTemplateWorkflowTemplate generates and returns a workflowTemplate for a given workspaceTemplate manifest
func (c *Client) GenerateWorkspaceTemplateWorkflowTemplate(workspaceTemplate *WorkspaceTemplate) (workflowTemplate *WorkflowTemplate, err error) {
workflowTemplate, err = c.generateWorkspaceTemplateWorkflowTemplate(workspaceTemplate)
if err != nil {
return nil, err
}
return workflowTemplate, nil
}
// CreateWorkspaceTemplate creates a template for Workspaces
func (c *Client) CreateWorkspaceTemplate(namespace string, workspaceTemplate *WorkspaceTemplate) (*WorkspaceTemplate, error) {
_, err := govalidator.ValidateStruct(workspaceTemplate)
if err != nil {
return nil, util.NewUserError(codes.InvalidArgument, err.Error())
}
workspaceTemplate.Namespace = namespace
existingWorkspaceTemplate, err := c.getWorkspaceTemplateByName(namespace, workspaceTemplate.Name)
if err != nil {
return nil, err
}
if existingWorkspaceTemplate != nil {
message := fmt.Sprintf("Workspace template with the name '%v' already exists", workspaceTemplate.Name)
if existingWorkspaceTemplate.IsArchived {
message = fmt.Sprintf("An archived workspace template with the name '%v' already exists", workspaceTemplate.Name)
}
return nil, util.NewUserError(codes.AlreadyExists, message)
}
workspaceTemplate.WorkflowTemplate, err = c.generateWorkspaceTemplateWorkflowTemplate(workspaceTemplate)
if err != nil {
return nil, err
}
workspaceTemplate, err = c.createWorkspaceTemplate(namespace, workspaceTemplate)
if err != nil {
var statusError *util.UserError
if goerrors.As(err, &statusError) && statusError.Code == codes.InvalidArgument {
return nil, util.NewUserError(statusError.Code, strings.Replace(statusError.Message, "{{workflow.", "{{workspace.", -1))
}
return nil, err
}
return workspaceTemplate, nil
}
// GetWorkspaceTemplate return a workspaceTemplate and its corresponding workflowTemplate
// if version is 0, the latest version is returned.
func (c *Client) GetWorkspaceTemplate(namespace, uid string, version int64) (workspaceTemplate *WorkspaceTemplate, err error) {
sb := c.workspaceTemplateVersionsSelectBuilder(namespace, uid).
Limit(1)
sb = sb.Where(sq.Eq{"wt.is_archived": false})
if version == 0 {
sb = sb.Where(sq.Eq{
"wtv.is_latest": true,
"wftv.is_latest": true,
})
} else {
sb = sb.Where(sq.Eq{
"wtv.version": version,
"wftv.version": version,
})
}
workspaceTemplate = &WorkspaceTemplate{}
if err = c.DB.Getx(workspaceTemplate, sb); err != nil {
if err == sql.ErrNoRows {
return nil, nil
}
return
}
sysConfig, err := c.GetSystemConfig()
if err != nil {
return nil, err
}
if err := workspaceTemplate.InjectRuntimeParameters(sysConfig); err != nil {
return nil, err
}
return
}
// UpdateWorkspaceTemplate adds a new workspace template version
func (c *Client) UpdateWorkspaceTemplate(namespace string, workspaceTemplate *WorkspaceTemplate) (*WorkspaceTemplate, error) {
existingWorkspaceTemplate, err := c.GetWorkspaceTemplate(namespace, workspaceTemplate.UID, workspaceTemplate.Version)
if err != nil {
return nil, err
}
if existingWorkspaceTemplate == nil {
return nil, util.NewUserError(codes.NotFound, "Workspace template not found.")
}
workspaceTemplate.ID = existingWorkspaceTemplate.ID
workspaceTemplate.Name = existingWorkspaceTemplate.UID
workspaceTemplate.Namespace = existingWorkspaceTemplate.Namespace
updatedWorkflowTemplate, err := c.generateWorkspaceTemplateWorkflowTemplate(workspaceTemplate)
if err != nil {
return nil, err
}
updatedWorkflowTemplate.ID = existingWorkspaceTemplate.WorkflowTemplate.ID
updatedWorkflowTemplate.UID = existingWorkspaceTemplate.WorkflowTemplate.UID
updatedWorkflowTemplate.Labels = workspaceTemplate.Labels
workflowTemplateVersion, err := c.CreateWorkflowTemplateVersion(namespace, updatedWorkflowTemplate)
if err != nil {
var statusError *util.UserError
if goerrors.As(err, &statusError) && statusError.Code == codes.InvalidArgument {
return nil, util.NewUserError(statusError.Code, strings.Replace(statusError.Message, "{{workflow.", "{{workspace.", -1))
}
return nil, err
}
// TODO - this might not be needed with recent changes made.
workspaceTemplate.Version = workflowTemplateVersion.Version
workspaceTemplate.IsLatest = true
tx, err := c.DB.Begin()
if err != nil {
return nil, err
}
defer tx.Rollback()
if err := createLatestWorkspaceTemplateVersionDB(tx, workspaceTemplate); err != nil {
return nil, err
}
_, err = sb.Update("workspace_templates").
SetMap(sq.Eq{
"labels": workspaceTemplate.Labels,
"description": workspaceTemplate.Description,
}).
Where(sq.Eq{
"uid": workspaceTemplate.UID,
"namespace": workspaceTemplate.Namespace,
}).
RunWith(tx).
Exec()
if err != nil {
return nil, err
}
if err := tx.Commit(); err != nil {
return nil, err
}
return workspaceTemplate, nil
}
// UpdateWorkspaceTemplateManifest updates a workspace template by creating a new version where the only difference is the manifest
func (c *Client) UpdateWorkspaceTemplateManifest(namespace, uid string, manifest string) (*WorkspaceTemplate, error) {
existingTemplate, err := c.GetWorkspaceTemplate(namespace, uid, 0)
if err != nil {
return nil, err
}
existingTemplate.UID = uid
existingTemplate.Manifest = manifest
return c.UpdateWorkspaceTemplate(namespace, existingTemplate)
}
// ListWorkspaceTemplates returns a list of workspace templates that are not archived, sorted by most recent created first
func (c *Client) ListWorkspaceTemplates(namespace string, request *request.Request) (workspaceTemplates []*WorkspaceTemplate, err error) {
sb := c.workspaceTemplatesSelectBuilder(namespace).
Where(sq.Eq{
"wt.is_archived": false,
})
if request.HasSorting() {
properties := getWorkspaceTemplateColumnsMap(true)
for _, order := range request.Sort.Properties {
if columnName, ok := properties[order.Property]; ok {
nullSort := "NULLS FIRST"
if order.Direction == "desc" {
nullSort = "NULLS LAST" // default in postgres, but let's be explicit
}
sb = sb.OrderBy(fmt.Sprintf("wt.%v %v %v", columnName, order.Direction, nullSort))
}
}
} else {
sb = sb.OrderBy("wt.created_at DESC")
}
sb, err = applyWorkspaceTemplateFilter(sb, request)
if err != nil {
return nil, err
}
sb = *request.ApplyPaginationToSelect(&sb)
err = c.DB.Selectx(&workspaceTemplates, sb)
return
}
// ListWorkspaceTemplateVersions returns an array of WorkspaceTemplates with the version information loaded. Latest id is first.
func (c *Client) ListWorkspaceTemplateVersions(namespace, uid string) (workspaceTemplates []*WorkspaceTemplate, err error) {
sb := c.workspaceTemplateVersionsSelectBuilder(namespace, uid).
Options("DISTINCT ON (wtv.version) wtv.version,").
Where(sq.Eq{
"wt.is_archived": false,
"wft.is_archived": false,
}).
OrderBy("wtv.version DESC")
err = c.DB.Selectx(&workspaceTemplates, sb)
return
}
// CountWorkspaceTemplates returns the total number of non-archived workspace templates for the input namespace
func (c *Client) CountWorkspaceTemplates(namespace string) (count int, err error) {
err = sb.Select("count(*)").
From("workspace_templates wt").
Where(sq.Eq{
"wt.namespace": namespace,
"wt.is_archived": false,
}).
RunWith(c.DB).
QueryRow().
Scan(&count)
return
}
// archiveWorkspaceTemplateDB marks the Workspace template identified by (namespace, uid) and is_archived=false, as archived.
//
// This method returns (true, nil) when the database record was successfully archived.
// If there was no record to archive, (false, nil) is returned.
func (c *Client) archiveWorkspaceTemplateDB(namespace, uid string) (archived bool, err error) {
result, err := sb.Update("workspace_templates").
Set("is_archived", true).
Where(sq.Eq{
"uid": uid,
"namespace": namespace,
"is_archived": false,
}).
RunWith(c.DB).
Exec()
if err != nil {
return false, err
}
rowsAffected, err := result.RowsAffected()
if err != nil {
return false, err
}
if rowsAffected == 0 {
return false, nil
}
return true, nil
}
// WorkspaceTemplateHasRunningWorkspaces returns true if there are non-terminated (or terminating) workspaces that are
// based of this template. False otherwise.
func (c *Client) WorkspaceTemplateHasRunningWorkspaces(namespace string, uid string) (bool, error) {
runningCount := 0
err := sb.Select("COUNT(*)").
From("workspaces w").
Join("workspace_templates wt ON wt.id = w.workspace_template_id").
Where(sq.And{
sq.Eq{
"wt.namespace": namespace,
"wt.uid": uid,
}, sq.NotEq{
"w.phase": []string{"Terminated"},
}}).
RunWith(c.DB).
QueryRow().
Scan(&runningCount)
if err != nil {
return false, err
}
return runningCount > 0, nil
}
// ArchiveWorkspaceTemplate archives and deletes resources associated with the workspace template.
//
// In particular, this action
//
// * Archives each workspace (k8s cleaned-up, database entry marked archived)
//
// * Marks associated Workflow template as archived
//
// * Marks associated Workflow executions as archived
//
// * Deletes Workflow Executions in k8s
func (c *Client) ArchiveWorkspaceTemplate(namespace string, uid string) (archived bool, err error) {
wsTemp, err := c.GetWorkspaceTemplate(namespace, uid, 0)
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"UID": uid,
"Error": err.Error(),
}).Error("Get Workspace Template failed.")
return false, util.NewUserError(codes.Unknown, "Unable to archive workspace template.")
}
if wsTemp == nil {
return false, fmt.Errorf("not found")
}
wsList, err := c.ListWorkspacesByTemplateID(namespace, wsTemp.WorkspaceTemplateVersionID)
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"UID": uid,
"Error": err.Error(),
}).Error("ListWorkspacesByTemplateID failed.")
return false, util.NewUserError(codes.Unknown, "Unable to archive workspace template.")
}
for _, ws := range wsList {
err = c.ArchiveWorkspace(namespace, ws.UID)
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"UID": uid,
"Error": err.Error(),
}).Error("ArchiveWorkspace failed.")
return false, util.NewUserError(codes.Unknown, "Unable to archive workspace template.")
}
}
_, err = c.archiveWorkspaceTemplateDB(namespace, wsTemp.UID)
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"UID": uid,
"Error": err.Error(),
}).Error("Archive Workspace Template DB Failed.")
return false, util.NewUserError(codes.Unknown, "Unable to archive workspace template.")
}
_, err = c.ArchiveWorkflowTemplate(namespace, wsTemp.UID)
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"UID": uid,
"Error": err.Error(),
}).Error("Archive Workflow Template Failed.")
return false, util.NewUserError(codes.Unknown, "Unable to archive workspace template.")
}
return true, nil
}
// ListWorkspaceTemplatesField loads all of the distinct field values for workspace templates
func (c *Client) ListWorkspaceTemplatesField(namespace, field string) (value []string, err error) {
if field != "name" {
return nil, fmt.Errorf("unsupported field '%v'", field)
}
columnName := fmt.Sprintf("wt.%v", field)
sb := sb.Select(columnName).
Distinct().
From("workspace_templates wt").
Where(sq.Eq{
"wt.namespace": namespace,
}).OrderBy(columnName)
err = c.DB.Selectx(&value, sb)
return
}