mirror of
https://github.com/onepanelio/onepanel.git
synced 2025-10-29 07:52:29 +08:00
Merge pull request #585 from aleksandrmelnikov/feat/core.583-workflows.use.pod.anti.affinity
feat: Updated Workflows, and Cron created Workflows, to ensure they get their own pod by using Pod AntiAffinity. This replaces resource limits and/or requests.
This commit is contained in:
@@ -429,11 +429,18 @@ func (c *Client) updateCronWorkflow(namespace string, uid string, workflowTempla
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) createCronWorkflow(namespace string, workflowTemplateId *uint64, wf *wfv1.Workflow, cwf *wfv1.CronWorkflow, opts *WorkflowExecutionOptions) (createdCronWorkflow *wfv1.CronWorkflow, err error) {
|
func (c *Client) createCronWorkflow(namespace string, workflowTemplateId *uint64, wf *wfv1.Workflow, cwf *wfv1.CronWorkflow, opts *WorkflowExecutionOptions) (createdCronWorkflow *wfv1.CronWorkflow, err error) {
|
||||||
|
systemConfig, err := c.GetSystemConfig()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
wf, err = ensureWorkflowRunsOnDedicatedNode(wf, systemConfig)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
cwf, err = c.buildCronWorkflowDefinition(namespace, workflowTemplateId, wf, cwf, opts)
|
cwf, err = c.buildCronWorkflowDefinition(namespace, workflowTemplateId, wf, cwf, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
createdCronWorkflow, err = c.ArgoprojV1alpha1().CronWorkflows(namespace).Create(cwf)
|
createdCronWorkflow, err = c.ArgoprojV1alpha1().CronWorkflows(namespace).Create(cwf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|||||||
@@ -208,49 +208,6 @@ func injectArtifactRepositoryConfig(artifact *wfv1.Artifact, namespaceConfig *Na
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// injectContainerResourceQuotas adds resource requests and limits if they exist
|
|
||||||
func injectContainerResourceQuotas(wf *wfv1.Workflow, template *wfv1.Template, systemConfig SystemConfig) {
|
|
||||||
if template.NodeSelector == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var value string
|
|
||||||
for k, v := range template.NodeSelector {
|
|
||||||
if k == *systemConfig.NodePoolLabel() {
|
|
||||||
value = v
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if value == "" {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if strings.Contains(value, "{{workflow.") {
|
|
||||||
parts := strings.Split(strings.Replace(value, "}}", "", -1), ".")
|
|
||||||
paramName := parts[len(parts)-1]
|
|
||||||
for _, param := range wf.Spec.Arguments.Parameters {
|
|
||||||
if param.Name == paramName && param.Value != nil {
|
|
||||||
value = *param.Value
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
option, err := systemConfig.NodePoolOptionByValue(value)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if option != nil && option.Resources.Limits != nil {
|
|
||||||
// If a node is selected specifically, match the resources request to limits
|
|
||||||
option.Resources.Requests = option.Resources.Limits
|
|
||||||
if template.Container != nil {
|
|
||||||
template.Container.Resources = option.Resources
|
|
||||||
}
|
|
||||||
if template.Script != nil {
|
|
||||||
template.Script.Container.Resources = option.Resources
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func injectEnvironmentVariables(container *corev1.Container, systemConfig SystemConfig) {
|
func injectEnvironmentVariables(container *corev1.Container, systemConfig SystemConfig) {
|
||||||
//Generate ENV vars from secret, if there is a container present in the workflow
|
//Generate ENV vars from secret, if there is a container present in the workflow
|
||||||
//Get template ENV vars, avoid over-writing them with secret values
|
//Get template ENV vars, avoid over-writing them with secret values
|
||||||
@@ -319,14 +276,10 @@ func (c *Client) injectAutomatedFields(namespace string, wf *wfv1.Workflow, opts
|
|||||||
MountPath: "/dev/shm",
|
MountPath: "/dev/shm",
|
||||||
})
|
})
|
||||||
|
|
||||||
injectContainerResourceQuotas(wf, template, systemConfig)
|
|
||||||
|
|
||||||
injectEnvironmentVariables(template.Container, systemConfig)
|
injectEnvironmentVariables(template.Container, systemConfig)
|
||||||
}
|
}
|
||||||
|
|
||||||
if template.Script != nil {
|
if template.Script != nil {
|
||||||
injectContainerResourceQuotas(wf, template, systemConfig)
|
|
||||||
|
|
||||||
injectEnvironmentVariables(&template.Script.Container, systemConfig)
|
injectEnvironmentVariables(&template.Script.Container, systemConfig)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -440,7 +393,14 @@ func (c *Client) createWorkflow(namespace string, workflowTemplateID uint64, wor
|
|||||||
if err = c.injectAutomatedFields(namespace, wf, opts); err != nil {
|
if err = c.injectAutomatedFields(namespace, wf, opts); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
systemConfig, err := c.GetSystemConfig()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
wf, err = ensureWorkflowRunsOnDedicatedNode(wf, systemConfig)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
createdArgoWorkflow, err := c.ArgoprojV1alpha1().Workflows(namespace).Create(wf)
|
createdArgoWorkflow, err := c.ArgoprojV1alpha1().Workflows(namespace).Create(wf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -470,6 +430,51 @@ func (c *Client) createWorkflow(namespace string, workflowTemplateID uint64, wor
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func ensureWorkflowRunsOnDedicatedNode(wf *wfv1.Workflow, config SystemConfig) (*wfv1.Workflow, error) {
|
||||||
|
antiAffinityLabelKey := "onepanel.io/reserves-instance-type"
|
||||||
|
nodeSelectorVal := ""
|
||||||
|
addPodAffinity := false
|
||||||
|
for i := range wf.Spec.Templates {
|
||||||
|
template := &wf.Spec.Templates[i]
|
||||||
|
if template.NodeSelector == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, v := range template.NodeSelector {
|
||||||
|
if strings.Contains(v, "{{workflow.") {
|
||||||
|
parts := strings.Split(strings.Replace(v, "}}", "", -1), ".")
|
||||||
|
paramName := parts[len(parts)-1]
|
||||||
|
for _, param := range wf.Spec.Arguments.Parameters {
|
||||||
|
if param.Name == paramName && param.Value != nil {
|
||||||
|
nodeSelectorVal = *param.Value
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
template.Metadata.Labels = map[string]string{antiAffinityLabelKey: nodeSelectorVal}
|
||||||
|
addPodAffinity = true
|
||||||
|
}
|
||||||
|
if addPodAffinity {
|
||||||
|
wf.Spec.Affinity = &corev1.Affinity{
|
||||||
|
PodAntiAffinity: &corev1.PodAntiAffinity{
|
||||||
|
RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{
|
||||||
|
{
|
||||||
|
LabelSelector: &metav1.LabelSelector{
|
||||||
|
MatchExpressions: []metav1.LabelSelectorRequirement{
|
||||||
|
{Key: antiAffinityLabelKey, Operator: "In", Values: []string{nodeSelectorVal}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
TopologyKey: "kubernetes.io/hostname",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return wf, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Client) ValidateWorkflowExecution(namespace string, manifest []byte) (err error) {
|
func (c *Client) ValidateWorkflowExecution(namespace string, manifest []byte) (err error) {
|
||||||
manifest, err = filterOutCustomTypesFromManifest(manifest)
|
manifest, err = filterOutCustomTypesFromManifest(manifest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
Reference in New Issue
Block a user