diff --git a/pkg/workflow_execution.go b/pkg/workflow_execution.go index 8a5df6f..87dd278 100644 --- a/pkg/workflow_execution.go +++ b/pkg/workflow_execution.go @@ -18,6 +18,7 @@ import ( "gopkg.in/yaml.v2" "io" "io/ioutil" + "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/watch" "net/http" "strconv" @@ -208,6 +209,108 @@ func injectArtifactRepositoryConfig(artifact *wfv1.Artifact, namespaceConfig *Na } } +// injectContainerResourceQuotas adds resource requests and limits if they exist +// Code grabs the resource request information from the nodeSelector, compared against running nodes. +// If the running node is not present, no resource information is retrieved. +func (c *Client) injectContainerResourceQuotas(wf *wfv1.Workflow, template *wfv1.Template, systemConfig SystemConfig) error { + if template.NodeSelector == nil { + return nil + } + + supportedNodePoolLabels := []string{"beta.kubernetes.io/instance-type", "node.kubernetes.io/instance-type"} + nodePoolLabel := "" + var value string + for k, v := range template.NodeSelector { + for _, supportedNodePoolLabel := range supportedNodePoolLabels { + if k == supportedNodePoolLabel { + nodePoolLabel = k + value = v + break + } + } + } + if value == "" { + return nil + } + if strings.Contains(value, "{{workflow.") { + parts := strings.Split(strings.Replace(value, "}}", "", -1), ".") + paramName := parts[len(parts)-1] + for _, param := range wf.Spec.Arguments.Parameters { + if param.Name == paramName && param.Value != nil { + value = *param.Value + break + } + } + } + + runningNodes, err := c.Interface.CoreV1().Nodes().List(ListOptions{}) + if err != nil { + return err + } + var cpu string + var memory string + var gpu int64 + gpuManufacturer := "" + for _, node := range runningNodes.Items { + if node.Labels[nodePoolLabel] == value { + cpuInt := node.Status.Allocatable.Cpu().MilliValue() + cpu = strconv.FormatFloat(float64(cpuInt)*.9, 'f', 0, 64) + "m" + memoryInt := node.Status.Allocatable.Memory().MilliValue() + kiBase := 1024.0 + ninetyPerc := float64(memoryInt) * .9 + toKi := ninetyPerc / kiBase / kiBase + memory = strconv.FormatFloat(toKi, 'f', 0, 64) + "Ki" + //Check for Nvidia + gpuQuantity := node.Status.Allocatable["nvidia.com/gpu"] + if gpuQuantity.IsZero() == false { + gpu = gpuQuantity.Value() + gpuManufacturer = "nvidia.com/gpu" + } + + //Check for AMD + //Source: https://github.com/RadeonOpenCompute/k8s-device-plugin/blob/master/example/pod/alexnet-gpu.yaml + gpuQuantity = node.Status.Allocatable["amd.com/gpu"] + if gpuQuantity.IsZero() == false { + gpu = gpuQuantity.Value() + gpuManufacturer = "amd.com/gpu" + } + } + } + + if cpu != "" && memory != "" { + resourceList := corev1.ResourceRequirements{ + Limits: nil, + Requests: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse(cpu), + corev1.ResourceMemory: resource.MustParse(memory), + }, + } + if gpu > 0 { + //todo disable this, not working with gpu nodes + resourceLimitsRaw := map[string]interface{}{} + resourceLimitsRaw[gpuManufacturer] = gpu + + var resourceListGpu corev1.ResourceList + marshal, err := yaml.Marshal(resourceLimitsRaw) + if err != nil { + return err + } + err = yaml.Unmarshal(marshal, &resourceListGpu) + if err != nil { + return err + } + resourceList.Limits = resourceListGpu + } + if template.Container != nil { + template.Container.Resources = resourceList + } + if template.Script != nil { + template.Script.Container.Resources = resourceList + } + } + return nil +} + func injectEnvironmentVariables(container *corev1.Container, systemConfig SystemConfig) { //Generate ENV vars from secret, if there is a container present in the workflow //Get template ENV vars, avoid over-writing them with secret values @@ -275,11 +378,18 @@ func (c *Client) injectAutomatedFields(namespace string, wf *wfv1.Workflow, opts Name: "sys-dshm", MountPath: "/dev/shm", }) - + err = c.injectContainerResourceQuotas(wf, template, systemConfig) + if err != nil { + return err + } injectEnvironmentVariables(template.Container, systemConfig) } if template.Script != nil { + err = c.injectContainerResourceQuotas(wf, template, systemConfig) + if err != nil { + return err + } injectEnvironmentVariables(&template.Script.Container, systemConfig) }