Merge branch 'master' into chore/removing.unused.function

This commit is contained in:
Aleksandr Melnikov
2020-11-13 10:17:35 -08:00
committed by GitHub
4 changed files with 282 additions and 6 deletions

View File

@@ -8,6 +8,7 @@ import (
"errors"
"fmt"
sq "github.com/Masterminds/squirrel"
"github.com/google/uuid"
"github.com/onepanelio/core/pkg/util/gcs"
"github.com/onepanelio/core/pkg/util/label"
"github.com/onepanelio/core/pkg/util/ptr"
@@ -18,8 +19,11 @@ import (
"gopkg.in/yaml.v2"
"io"
"io/ioutil"
networking "istio.io/api/networking/v1alpha3"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/watch"
"net/http"
yaml2 "sigs.k8s.io/yaml"
"strconv"
"strings"
"time"
@@ -275,6 +279,15 @@ func (c *Client) injectAutomatedFields(namespace string, wf *wfv1.Workflow, opts
template.Metadata.Annotations = make(map[string]string)
}
template.Metadata.Annotations["sidecar.istio.io/inject"] = "false"
//For workflows with accessible sidecars, we need istio
//Istio does not prevent the main container from stopping
for _, s := range template.Sidecars {
if s.TTY == true {
template.Metadata.Annotations["sidecar.istio.io/inject"] = "true"
//Only need one instance to require istio injection
break
}
}
if template.Container != nil {
// Mount dev/shm
@@ -408,6 +421,11 @@ func (c *Client) createWorkflow(namespace string, workflowTemplateID uint64, wor
return nil, err
}
newTemplateOrder, err := c.injectAccessForSidecars(namespace, wf)
if err != nil {
return nil, err
}
wf.Spec.Templates = newTemplateOrder
createdArgoWorkflow, err := c.ArgoprojV1alpha1().Workflows(namespace).Create(wf)
if err != nil {
return nil, err
@@ -437,6 +455,260 @@ func (c *Client) createWorkflow(namespace string, workflowTemplateID uint64, wor
return
}
func (c *Client) injectAccessForSidecars(namespace string, wf *wfv1.Workflow) ([]wfv1.Template, error) {
var newTemplateOrder []wfv1.Template
taskSysSendStatusName := "sys-send-status"
taskSysSendExitStats := "sys-send-exit-stats"
for tIdx, t := range wf.Spec.Templates {
//Inject services, virtual routes
for si, s := range t.Sidecars {
//If TTY is true, sidecar needs to be accessible by HTTP
//Otherwise, we skip the sidecar
if s.TTY != true {
continue
}
if len(s.Ports) == 0 {
msg := fmt.Sprintf("sidecar %s must have at least one port.", s.Name)
return nil, util.NewUserError(codes.InvalidArgument, msg)
}
t.Sidecars[si].MirrorVolumeMounts = ptr.Bool(true)
serviceNameUID := "s" + uuid.New().String() + "--" + namespace
serviceNameUIDDNSCompliant, err := uid2.GenerateUID(serviceNameUID, 63)
if err != nil {
return nil, util.NewUserError(codes.InvalidArgument, err.Error())
}
serviceName := serviceNameUIDDNSCompliant + "." + *c.systemConfig.Domain()
serviceTemplateName := uuid.New().String()
serviceTemplateNameAdd := "sys-k8s-service-template-add-" + serviceTemplateName
serviceTemplateNameDelete := "sys-k8s-service-template-delete-" + serviceTemplateName
serviceTaskName := "service-" + uuid.New().String()
serviceAddTaskName := "sys-add-" + serviceTaskName
serviceDeleteTaskName := "sys-delete-" + serviceTaskName
virtualServiceTemplateName := uuid.New().String()
virtualServiceTemplateNameAdd := "sys-k8s-virtual-service-template-add-" + virtualServiceTemplateName
virtualServiceTemplateNameDelete := "sys-k8s-virtual-service-template-delete-" + virtualServiceTemplateName
virtualServiceTaskName := "virtual-service-" + uuid.New().String()
virtualServiceAddTaskName := "sys-add-" + virtualServiceTaskName
virtualServiceDeleteTaskName := "sys-delete-" + virtualServiceTaskName
var servicePorts []corev1.ServicePort
var routes []*networking.HTTPRoute
for _, port := range s.Ports {
servicePort := corev1.ServicePort{
Name: port.Name,
Protocol: port.Protocol,
Port: port.ContainerPort,
TargetPort: intstr.FromInt(int(port.ContainerPort)),
}
servicePorts = append(servicePorts, servicePort)
route := networking.HTTPRoute{
Match: []*networking.HTTPMatchRequest{
{
Uri: &networking.StringMatch{
MatchType: &networking.StringMatch_Prefix{
Prefix: "/"},
},
},
},
Route: []*networking.HTTPRouteDestination{
{
Destination: &networking.Destination{
Host: serviceNameUIDDNSCompliant,
Port: &networking.PortSelector{
Number: uint32(port.ContainerPort),
},
},
},
},
}
routes = append(routes, &route)
}
service := corev1.Service{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Service",
},
ObjectMeta: metav1.ObjectMeta{
Name: serviceNameUIDDNSCompliant,
},
Spec: corev1.ServiceSpec{
Ports: servicePorts,
Selector: map[string]string{
serviceTaskName: serviceNameUIDDNSCompliant,
},
},
}
//Istio needs to know which pod to setup the route to
if wf.Spec.Templates[tIdx].Metadata.Labels == nil {
wf.Spec.Templates[tIdx].Metadata.Labels = make(map[string]string)
}
wf.Spec.Templates[tIdx].Metadata.Labels[serviceTaskName] = serviceNameUIDDNSCompliant
serviceManifestBytes, err := yaml2.Marshal(service)
if err != nil {
return nil, err
}
serviceManifest := string(serviceManifestBytes)
templateServiceResource := wfv1.Template{
Name: serviceTemplateNameAdd,
Metadata: wfv1.Metadata{
Annotations: map[string]string{
"sidecar.istio.io/inject": "false",
},
},
Resource: &wfv1.ResourceTemplate{
Action: "create",
Manifest: serviceManifest,
},
}
newTemplateOrder = append(newTemplateOrder, templateServiceResource)
//routes
virtualServiceNameUUID := "vs-" + uuid.New().String()
hosts := []string{serviceName}
wf.Spec.Templates[tIdx].Outputs.Parameters = append(wf.Spec.Templates[tIdx].Outputs.Parameters,
wfv1.Parameter{Name: "sys-sidecar-url--" + s.Name, Value: &serviceName},
)
virtualService := map[string]interface{}{
"apiVersion": "networking.istio.io/v1alpha3",
"kind": "VirtualService",
"metadata": metav1.ObjectMeta{
Name: virtualServiceNameUUID,
},
"spec": networking.VirtualService{
Http: routes,
Gateways: []string{"istio-system/ingressgateway"},
Hosts: hosts,
},
}
virtualServiceManifestBytes, err := yaml2.Marshal(virtualService)
if err != nil {
return nil, err
}
virtualServiceManifest := string(virtualServiceManifestBytes)
templateRouteResource := wfv1.Template{
Name: virtualServiceTemplateNameAdd,
Metadata: wfv1.Metadata{
Annotations: map[string]string{
"sidecar.istio.io/inject": "false",
},
},
Resource: &wfv1.ResourceTemplate{
Action: "create",
Manifest: virtualServiceManifest,
},
}
newTemplateOrder = append(newTemplateOrder, templateRouteResource)
for i2, t2 := range wf.Spec.Templates {
if t2.Name == wf.Spec.Entrypoint {
if t2.DAG != nil {
tasks := wf.Spec.Templates[i2].DAG.Tasks
t := tasks[0]
sysDepFound := false
for _, d := range t.Dependencies {
if d == taskSysSendStatusName {
sysDepFound = true
wf.Spec.Templates[i2].DAG.Tasks[0].Dependencies =
[]string{virtualServiceAddTaskName}
}
}
if sysDepFound == false {
wf.Spec.Templates[i2].DAG.Tasks[0].Dependencies = append(wf.Spec.Templates[i2].DAG.Tasks[0].Dependencies, virtualServiceAddTaskName)
}
wf.Spec.Templates[i2].DAG.Tasks = append(tasks, []wfv1.DAGTask{
{
Name: serviceAddTaskName,
Template: serviceTemplateNameAdd,
Dependencies: []string{taskSysSendStatusName},
},
{
Name: virtualServiceAddTaskName,
Template: virtualServiceTemplateNameAdd,
Dependencies: []string{serviceAddTaskName},
},
}...)
}
}
}
//Inject clean-up for service and virtualservice
templateServiceDeleteResource := wfv1.Template{
Name: serviceTemplateNameDelete,
Metadata: wfv1.Metadata{
Annotations: map[string]string{
"sidecar.istio.io/inject": "false",
},
},
Resource: &wfv1.ResourceTemplate{
Action: "delete",
Manifest: serviceManifest,
},
}
newTemplateOrder = append(newTemplateOrder, templateServiceDeleteResource)
templateRouteDeleteResource := wfv1.Template{
Name: virtualServiceTemplateNameDelete,
Metadata: wfv1.Metadata{
Annotations: map[string]string{
"sidecar.istio.io/inject": "false",
},
},
Resource: &wfv1.ResourceTemplate{
Action: "delete",
Manifest: virtualServiceManifest,
},
}
newTemplateOrder = append(newTemplateOrder, templateRouteDeleteResource)
dagTasks := []wfv1.DAGTask{
{
Name: serviceDeleteTaskName,
Template: serviceTemplateNameDelete,
},
{
Name: virtualServiceDeleteTaskName,
Template: virtualServiceTemplateNameDelete,
Dependencies: []string{serviceDeleteTaskName},
},
}
if wf.Spec.OnExit != "" {
for _, t := range wf.Spec.Templates {
if t.Name == wf.Spec.OnExit {
t.DAG.Tasks = append(t.DAG.Tasks, dagTasks...)
sysExitDepFound := false
for dti, dt := range t.DAG.Tasks {
if dt.Name == taskSysSendExitStats {
sysExitDepFound = true
t.DAG.Tasks[dti].Dependencies = append(t.DAG.Tasks[dti].Dependencies, virtualServiceDeleteTaskName)
}
}
if sysExitDepFound == false {
t.DAG.Tasks[0].Dependencies = append(t.DAG.Tasks[0].Dependencies, virtualServiceDeleteTaskName)
}
break
}
}
} else {
exitHandlerDAG := wfv1.Template{
Name: "exit-handler",
DAG: &wfv1.DAGTemplate{
Tasks: dagTasks,
},
}
wf.Spec.OnExit = "exit-handler"
wf.Spec.Templates = append(wf.Spec.Templates, exitHandlerDAG)
}
}
newTemplateOrder = append(newTemplateOrder, wf.Spec.Templates[tIdx])
}
return newTemplateOrder, nil
}
func (c *Client) ValidateWorkflowExecution(namespace string, manifest []byte) (err error) {
manifest, err = filterOutCustomTypesFromManifest(manifest)
if err != nil {
@@ -588,7 +860,7 @@ func (c *Client) createWorkflowExecutionDB(namespace string, workflowExecution *
return
}
func (c *Client) FinishWorkflowExecutionStatisticViaExitHandler(namespace, name string, workflowTemplateID int64, phase wfv1.NodePhase, startedAt time.Time) (err error) {
func (c *Client) FinishWorkflowExecutionStatisticViaExitHandler(namespace, name string, phase wfv1.NodePhase, startedAt time.Time) (err error) {
_, err = sb.Update("workflow_executions").
SetMap(sq.Eq{
"started_at": startedAt.UTC(),
@@ -597,7 +869,10 @@ func (c *Client) FinishWorkflowExecutionStatisticViaExitHandler(namespace, name
"finished_at": time.Now().UTC(),
"phase": phase,
}).
Where(sq.Eq{"name": name}).
Where(sq.And{
sq.Eq{"name": name},
sq.NotEq{"phase": "Terminated"},
}).
RunWith(c.DB).
Exec()
@@ -1207,7 +1482,7 @@ func (c *Client) TerminateWorkflowExecution(namespace, uid string) (err error) {
return err
}
err = argoutil.TerminateWorkflow(c.ArgoprojV1alpha1().Workflows(namespace), uid)
err = argoutil.StopWorkflow(c.ArgoprojV1alpha1().Workflows(namespace), uid, "", "")
return
}