Compare commits

...

12 Commits

Author SHA1 Message Date
Rush Tehrani
0e1e48dfc8 Merge pull request #647 from rushtehrani/master
fix: Use correct group for workspaces resource
2020-10-08 20:03:51 -07:00
rushtehrani
dd0f1f7705 use correct group for workspaces resource 2020-10-08 19:52:48 -07:00
Aleksandr Melnikov
03f8f47664 Merge pull request #642 from Vafilor/fix/jupyterlab.migrations
feat: added convenience method to update workspace template manifest
2020-10-06 10:47:18 -07:00
Andrey Melnikov
c85496d216 update: added method specifically to update the manifest of a workspace template and modified recent migration to use it.
This fixes an issue where the jupyterlab migration wiped out the old description.
2020-10-06 10:28:03 -07:00
Rush Tehrani
5f6415548d Merge pull request #640 from aleksandrmelnikov/fix/core.637-dedicated.nodes.via.hostport
fix: Fixing issues with using hostPort. Removed prior logic that still relied on running nodes.
2020-10-05 15:40:26 -07:00
Aleksandr Melnikov
c641c17a8c Updating code that generates an extra container for a workspace.
- Renamed function to make it clearer what it's doing with the extra container
- Added documentation for the function
- Removed listing nodes code, since we only care if the workspace has
a nodeSelector set.
2020-10-05 14:17:00 -07:00
Aleksandr Melnikov
83a2543b13 Updating function name to reflect what it's doing. 2020-10-05 14:04:32 -07:00
Aleksandr Melnikov
e8dae0f2e9 Since we're no longer relying on running nodes, we don't need logic
relating to them.
- We can just check if a nodeSelector is set on the template.
2020-10-05 13:59:14 -07:00
Rush Tehrani
b85bf4d688 Merge pull request #638 from aleksandrmelnikov/fix/core.637-dedicated.nodes.via.hostport
fix: Replace resource requests and limits with hostPort, as a means of grabbing dedicated nodes.
2020-10-05 12:17:45 -07:00
Aleksandr Melnikov
7fe0ab2654 Tweaking names so it's more clear why they are there. 2020-10-05 11:51:04 -07:00
Aleksandr Melnikov
dfa6eb2fe6 Removing function that's no longer used. 2020-10-05 11:46:19 -07:00
Aleksandr Melnikov
cc2c51ace5 Removing resource requests and limits.
- Using hostPort on the node as a way to require dedicated nodes
for workspaces and workflows.
2020-10-05 11:46:01 -07:00
5 changed files with 45 additions and 183 deletions

View File

@@ -2,7 +2,6 @@ package migration
import (
"database/sql"
v1 "github.com/onepanelio/core/pkg"
uid2 "github.com/onepanelio/core/pkg/util/uid"
"github.com/pressly/goose"
)
@@ -103,13 +102,7 @@ func Up20200929153931(tx *sql.Tx) error {
return err
}
for _, namespace := range namespaces {
workspaceTemplate := &v1.WorkspaceTemplate{
UID: uid,
Name: jupyterLabTemplateName,
Manifest: jupyterWorkspaceTemplate3,
}
if _, err := client.UpdateWorkspaceTemplate(namespace.Name, workspaceTemplate); err != nil {
if _, err := client.UpdateWorkspaceTemplateManifest(namespace.Name, uid, jupyterWorkspaceTemplate3); err != nil {
return err
}
}
@@ -144,14 +137,9 @@ func Down20200929153931(tx *sql.Tx) error {
if err != nil {
return err
}
workspaceTemplate := &v1.WorkspaceTemplate{
UID: uid,
Name: jupyterLabTemplateName,
Manifest: jupyterWorkspaceTemplate2,
}
for _, namespace := range namespaces {
if _, err := client.UpdateWorkspaceTemplate(namespace.Name, workspaceTemplate); err != nil {
if _, err := client.UpdateWorkspaceTemplateManifest(namespace.Name, uid, jupyterWorkspaceTemplate2); err != nil {
return err
}
}

View File

@@ -18,7 +18,6 @@ import (
"gopkg.in/yaml.v2"
"io"
"io/ioutil"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/watch"
"net/http"
"strconv"
@@ -197,70 +196,21 @@ func injectArtifactRepositoryConfig(artifact *wfv1.Artifact, namespaceConfig *Na
}
}
// injectContainerResourceQuotas adds resource requests and limits if they exist
// Code grabs the resource request information from the nodeSelector, compared against running nodes.
// If the running node is not present, no resource information is retrieved.
func (c *Client) injectContainerResourceQuotas(wf *wfv1.Workflow, template *wfv1.Template, systemConfig SystemConfig) error {
// injectHostPortToContainer adds a hostPort to the template container, if a nodeSelector is present.
// Kubernetes will ensure that multiple containers with the same hostPort do not share the same node.
func (c *Client) injectHostPortToContainer(template *wfv1.Template) error {
if template.NodeSelector == nil {
return nil
}
supportedNodePoolLabels := []string{"beta.kubernetes.io/instance-type", "node.kubernetes.io/instance-type"}
nodePoolLabel := ""
var value string
for k, v := range template.NodeSelector {
for _, supportedNodePoolLabel := range supportedNodePoolLabels {
if k == supportedNodePoolLabel {
nodePoolLabel = k
value = v
break
}
}
ports := []corev1.ContainerPort{
{Name: "node-capturer", HostPort: 80, ContainerPort: 80},
}
if value == "" {
return nil
if template.Container != nil {
template.Container.Ports = ports
}
if strings.Contains(value, "{{workflow.") {
parts := strings.Split(strings.Replace(value, "}}", "", -1), ".")
paramName := parts[len(parts)-1]
for _, param := range wf.Spec.Arguments.Parameters {
if param.Name == paramName && param.Value != nil {
value = *param.Value
break
}
}
}
runningNodes, err := c.Interface.CoreV1().Nodes().List(ListOptions{})
if err != nil {
return err
}
for _, node := range runningNodes.Items {
if node.Labels[nodePoolLabel] == value {
cpu, memory, gpu, gpuManufacturer := CalculateResourceRequirements(node, nodePoolLabel, value)
if cpu != "" && memory != "" {
resourceList := corev1.ResourceRequirements{
Limits: nil,
Requests: map[corev1.ResourceName]resource.Quantity{
corev1.ResourceCPU: resource.MustParse(cpu),
corev1.ResourceMemory: resource.MustParse(memory),
},
}
if gpu > 0 {
stringGpu := strconv.FormatInt(gpu, 10)
resourceList.Limits = make(map[corev1.ResourceName]resource.Quantity)
resourceList.Limits[corev1.ResourceName(gpuManufacturer)] = resource.MustParse(stringGpu)
}
if template.Container != nil {
template.Container.Resources = resourceList
}
if template.Script != nil {
template.Script.Container.Resources = resourceList
}
//process only one node
return nil
}
}
if template.Script != nil {
template.Script.Container.Ports = ports
}
return nil
}
@@ -332,7 +282,7 @@ func (c *Client) injectAutomatedFields(namespace string, wf *wfv1.Workflow, opts
Name: "sys-dshm",
MountPath: "/dev/shm",
})
err = c.injectContainerResourceQuotas(wf, template, systemConfig)
err = c.injectHostPortToContainer(template)
if err != nil {
return err
}
@@ -340,7 +290,7 @@ func (c *Client) injectAutomatedFields(namespace string, wf *wfv1.Workflow, opts
}
if template.Script != nil {
err = c.injectContainerResourceQuotas(wf, template, systemConfig)
err = c.injectHostPortToContainer(template)
if err != nil {
return err
}

View File

@@ -15,8 +15,6 @@ import (
"github.com/onepanelio/core/pkg/util/request"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
corev1 "k8s.io/api/core/v1"
"strconv"
"strings"
"time"
)
@@ -287,26 +285,7 @@ func (c *Client) addResourceRequestsAndLimitsToWorkspaceTemplate(t wfv1.Template
if !ok {
return nil, errors.New("unable to type check statefulset manifest")
}
//Get node selected
labelKey := "sys-node-pool-label"
labelKeyVal := ""
for _, parameter := range argoTemplate.Spec.Arguments.Parameters {
if parameter.Name == labelKey {
labelKeyVal = *parameter.Value
}
}
nodePoolKey := "sys-node-pool"
nodePoolVal := ""
for _, parameter := range workspace.Parameters {
if parameter.Name == nodePoolKey {
nodePoolVal = *parameter.Value
}
}
extraContainer, err := generateExtraContainerWithResources(c, labelKeyVal, nodePoolVal)
if err != nil {
return nil, err
}
extraContainer := generateExtraContainerWithHostPortToSequesterNode()
if extraContainer != nil {
containers, ok := templateSpec["containers"].([]interface{})
if !ok {
@@ -322,93 +301,25 @@ func (c *Client) addResourceRequestsAndLimitsToWorkspaceTemplate(t wfv1.Template
return resultManifest, nil
}
// generateExtraContainerWithResources will add an extra container to a workspace.
// The extra container will have the calculated resource request for the node selected by the workspace.
// generateExtraContainerWithHostPortToSequesterNode will add an extra container to a workspace.
// The extra container have a hostPort set. Kubernetes will ensure the hostPort does not get conflict
// between containers, scheduling a new node as needed.
// The container will sleep once started, and generally consume negligible resources.
//
// The node that was selected has to be already running, in order to get the resource request correct.
func generateExtraContainerWithResources(c *Client, labelKeyVal string, nodePoolVal string) (map[string]interface{}, error) {
runningNodes, err := c.Interface.CoreV1().Nodes().List(ListOptions{})
if err != nil {
return nil, err
func generateExtraContainerWithHostPortToSequesterNode() map[string]interface{} {
extraContainer := map[string]interface{}{
"image": "alpine:latest",
"name": "node-capturer",
"command": []interface{}{"/bin/sh"},
"args": []interface{}{"-c", "while :; do sleep 2073600; done"},
"ports": []interface{}{
map[string]interface{}{
"name": "node-capturer",
"hostPort": 80,
"containerPort": 80,
},
},
}
for _, node := range runningNodes.Items {
cpu, memory, gpu, gpuManufacturer := CalculateResourceRequirements(node, labelKeyVal, nodePoolVal)
if cpu != "" && memory != "" {
extraContainer := map[string]interface{}{
"image": "alpine:latest",
"name": "resource-requester",
"command": []interface{}{"/bin/sh"},
"args": []interface{}{"-c", "while :; do sleep 2073600; done"},
"resources": map[string]interface{}{
"requests": map[string]interface{}{
"cpu": cpu,
"memory": memory,
},
"limits": map[string]interface{}{},
},
}
if gpu > 0 {
res, ok := extraContainer["resources"].(map[string]interface{})
if !ok {
return nil, errors.New("unable to type check extraContainer")
}
reqs, ok := res["requests"].(map[string]interface{})
if !ok {
return nil, errors.New("unable to type check extraContainer")
}
reqs[gpuManufacturer] = gpu
limits, ok := res["limits"].(map[string]interface{})
if !ok {
return nil, errors.New("unable to type check extraContainer")
}
limits[gpuManufacturer] = gpu
}
//process only one node
return extraContainer, err
}
}
return nil, nil
}
// CalculateResourceRequirements will take the passed in Node and try to figure out the
// allocatable capacity. Once the capacity is discovered, function figures out how to request 90%.
// - We use 90% because manual calculation is error prone and not necessarily reliable.
// Params:
// k8sInstanceTypeLabel - Such as 'beta.kubernetes.io/instance-type'
// nodeSelectorValue - Server/VM Type Name, Such as "Standard_NC6", on Azure.
func CalculateResourceRequirements(node corev1.Node, k8sInstanceTypeLabel string, nodeSelectorValue string) (string, string, int64, string) {
var cpu string
var memory string
var gpu int64
gpuManufacturer := ""
if node.Labels[k8sInstanceTypeLabel] == nodeSelectorValue {
cpuInt := node.Status.Allocatable.Cpu().MilliValue()
cpu = strconv.FormatFloat(float64(cpuInt)*.9, 'f', 0, 64) + "m"
memoryInt := node.Status.Allocatable.Memory().MilliValue()
kiBase := 1024.0
ninetyPerc := float64(memoryInt) * .9
toKi := ninetyPerc / kiBase / kiBase
memory = strconv.FormatFloat(toKi, 'f', 0, 64) + "Ki"
//Check for Nvidia
gpuQuantity := node.Status.Allocatable["nvidia.com/gpu"]
if gpuQuantity.IsZero() == false {
gpu = gpuQuantity.Value()
gpuManufacturer = "nvidia.com/gpu"
}
//Check for AMD
//Source: https://github.com/RadeonOpenCompute/k8s-device-plugin/blob/master/example/pod/alexnet-gpu.yaml
gpuQuantity = node.Status.Allocatable["amd.com/gpu"]
if gpuQuantity.IsZero() == false {
gpu = gpuQuantity.Value()
gpuManufacturer = "amd.com/gpu"
}
}
return cpu, memory, gpu, gpuManufacturer
return extraContainer
}
// startWorkspace starts a workspace and related resources. It assumes a DB record already exists

View File

@@ -1158,6 +1158,19 @@ func (c *Client) UpdateWorkspaceTemplate(namespace string, workspaceTemplate *Wo
return workspaceTemplate, nil
}
// UpdateWorkspaceTemplateManifest updates a workspace template by creating a new version where the only difference is the manifest
func (c *Client) UpdateWorkspaceTemplateManifest(namespace, uid string, manifest string) (*WorkspaceTemplate, error) {
existingTemplate, err := c.GetWorkspaceTemplate(namespace, uid, 0)
if err != nil {
return nil, err
}
existingTemplate.UID = uid
existingTemplate.Manifest = manifest
return c.UpdateWorkspaceTemplate(namespace, existingTemplate)
}
// ListWorkspaceTemplates returns a list of workspace templates that are not archived, sorted by most recent created first
func (c *Client) ListWorkspaceTemplates(namespace string, request *request.Request) (workspaceTemplates []*WorkspaceTemplate, err error) {
sb := c.workspaceTemplatesSelectBuilder(namespace).

View File

@@ -363,7 +363,7 @@ func (s *WorkspaceServer) RetryLastWorkspaceAction(ctx context.Context, req *api
func (s *WorkspaceServer) GetWorkspaceStatisticsForNamespace(ctx context.Context, req *api.GetWorkspaceStatisticsForNamespaceRequest) (*api.GetWorkspaceStatisticsForNamespaceResponse, error) {
client := getClient(ctx)
allowed, err := auth.IsAuthorized(client, req.Namespace, "list", "argoproj.io", "workspaces", "")
allowed, err := auth.IsAuthorized(client, req.Namespace, "list", "onepanel.io", "workspaces", "")
if err != nil || !allowed {
return nil, err
}