Adding back injectContainerResourceQuotas for workflows.

- This time, the node capacity is grabbed from running nodes instead
of information from configmap (which is grabbed from params.yaml)
- Added support for two different instance-type keys.
Note that GPU support does not work at this time.
- There is no ResourceName "nvidia.com/gpu" in library code, so it throws
a deref nil error.
This commit is contained in:
Aleksandr Melnikov
2020-09-24 17:28:54 -07:00
parent bafd371e11
commit dca6db842c

View File

@@ -18,6 +18,7 @@ import (
"gopkg.in/yaml.v2"
"io"
"io/ioutil"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/watch"
"net/http"
"strconv"
@@ -208,6 +209,108 @@ func injectArtifactRepositoryConfig(artifact *wfv1.Artifact, namespaceConfig *Na
}
}
// injectContainerResourceQuotas adds resource requests and limits if they exist
// Code grabs the resource request information from the nodeSelector, compared against running nodes.
// If the running node is not present, no resource information is retrieved.
func (c *Client) injectContainerResourceQuotas(wf *wfv1.Workflow, template *wfv1.Template, systemConfig SystemConfig) error {
if template.NodeSelector == nil {
return nil
}
supportedNodePoolLabels := []string{"beta.kubernetes.io/instance-type", "node.kubernetes.io/instance-type"}
nodePoolLabel := ""
var value string
for k, v := range template.NodeSelector {
for _, supportedNodePoolLabel := range supportedNodePoolLabels {
if k == supportedNodePoolLabel {
nodePoolLabel = k
value = v
break
}
}
}
if value == "" {
return nil
}
if strings.Contains(value, "{{workflow.") {
parts := strings.Split(strings.Replace(value, "}}", "", -1), ".")
paramName := parts[len(parts)-1]
for _, param := range wf.Spec.Arguments.Parameters {
if param.Name == paramName && param.Value != nil {
value = *param.Value
break
}
}
}
runningNodes, err := c.Interface.CoreV1().Nodes().List(ListOptions{})
if err != nil {
return err
}
var cpu string
var memory string
var gpu int64
gpuManufacturer := ""
for _, node := range runningNodes.Items {
if node.Labels[nodePoolLabel] == value {
cpuInt := node.Status.Allocatable.Cpu().MilliValue()
cpu = strconv.FormatFloat(float64(cpuInt)*.9, 'f', 0, 64) + "m"
memoryInt := node.Status.Allocatable.Memory().MilliValue()
kiBase := 1024.0
ninetyPerc := float64(memoryInt) * .9
toKi := ninetyPerc / kiBase / kiBase
memory = strconv.FormatFloat(toKi, 'f', 0, 64) + "Ki"
//Check for Nvidia
gpuQuantity := node.Status.Allocatable["nvidia.com/gpu"]
if gpuQuantity.IsZero() == false {
gpu = gpuQuantity.Value()
gpuManufacturer = "nvidia.com/gpu"
}
//Check for AMD
//Source: https://github.com/RadeonOpenCompute/k8s-device-plugin/blob/master/example/pod/alexnet-gpu.yaml
gpuQuantity = node.Status.Allocatable["amd.com/gpu"]
if gpuQuantity.IsZero() == false {
gpu = gpuQuantity.Value()
gpuManufacturer = "amd.com/gpu"
}
}
}
if cpu != "" && memory != "" {
resourceList := corev1.ResourceRequirements{
Limits: nil,
Requests: map[corev1.ResourceName]resource.Quantity{
corev1.ResourceCPU: resource.MustParse(cpu),
corev1.ResourceMemory: resource.MustParse(memory),
},
}
if gpu > 0 {
//todo disable this, not working with gpu nodes
resourceLimitsRaw := map[string]interface{}{}
resourceLimitsRaw[gpuManufacturer] = gpu
var resourceListGpu corev1.ResourceList
marshal, err := yaml.Marshal(resourceLimitsRaw)
if err != nil {
return err
}
err = yaml.Unmarshal(marshal, &resourceListGpu)
if err != nil {
return err
}
resourceList.Limits = resourceListGpu
}
if template.Container != nil {
template.Container.Resources = resourceList
}
if template.Script != nil {
template.Script.Container.Resources = resourceList
}
}
return nil
}
func injectEnvironmentVariables(container *corev1.Container, systemConfig SystemConfig) {
//Generate ENV vars from secret, if there is a container present in the workflow
//Get template ENV vars, avoid over-writing them with secret values
@@ -275,11 +378,18 @@ func (c *Client) injectAutomatedFields(namespace string, wf *wfv1.Workflow, opts
Name: "sys-dshm",
MountPath: "/dev/shm",
})
err = c.injectContainerResourceQuotas(wf, template, systemConfig)
if err != nil {
return err
}
injectEnvironmentVariables(template.Container, systemConfig)
}
if template.Script != nil {
err = c.injectContainerResourceQuotas(wf, template, systemConfig)
if err != nil {
return err
}
injectEnvironmentVariables(&template.Script.Container, systemConfig)
}