diff --git a/pkg/handler/remote.go b/pkg/handler/remote.go index 07352fda..0f05f683 100644 --- a/pkg/handler/remote.go +++ b/pkg/handler/remote.go @@ -16,10 +16,10 @@ import ( k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" + corev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/util/cert" cmdutil "k8s.io/kubectl/pkg/cmd/util" "k8s.io/kubectl/pkg/polymorphichelpers" @@ -32,9 +32,6 @@ import ( ) func createOutboundPod(ctx context.Context, factory cmdutil.Factory, clientset *kubernetes.Clientset, namespace string, gvisor bool, imagePullSecretName string) (err error) { - innerIpv4CIDR := net.IPNet{IP: config.RouterIP, Mask: config.CIDR.Mask} - innerIpv6CIDR := net.IPNet{IP: config.RouterIP6, Mask: config.CIDR6.Mask} - service, err := clientset.CoreV1().Services(namespace).Get(ctx, config.ConfigMapPodTrafficManager, metav1.GetOptions{}) if err == nil { var pod *v1.Pod @@ -79,13 +76,7 @@ func createOutboundPod(ctx context.Context, factory cmdutil.Factory, clientset * // 2) create serviceAccount log.Infof("Creating ServiceAccount %s", config.ConfigMapPodTrafficManager) - _, err = clientset.CoreV1().ServiceAccounts(namespace).Create(ctx, &v1.ServiceAccount{ - ObjectMeta: metav1.ObjectMeta{ - Name: config.ConfigMapPodTrafficManager, - Namespace: namespace, - }, - AutomountServiceAccountToken: pointer.Bool(true), - }, metav1.CreateOptions{}) + _, err = clientset.CoreV1().ServiceAccounts(namespace).Create(ctx, genServiceAccount(namespace), metav1.CreateOptions{}) if err != nil { log.Infof("Creating ServiceAccount error: %s", err.Error()) return err @@ -93,18 +84,7 @@ func createOutboundPod(ctx context.Context, factory cmdutil.Factory, clientset * // 3) create roles log.Infof("Creating Roles %s", config.ConfigMapPodTrafficManager) - _, err = clientset.RbacV1().Roles(namespace).Create(ctx, &rbacv1.Role{ - ObjectMeta: metav1.ObjectMeta{ - Name: config.ConfigMapPodTrafficManager, - Namespace: namespace, - }, - Rules: []rbacv1.PolicyRule{{ - Verbs: []string{"get", "list", "watch", "create", "update", "patch", "delete"}, - APIGroups: []string{""}, - Resources: []string{"configmaps", "secrets"}, - ResourceNames: []string{config.ConfigMapPodTrafficManager}, - }}, - }, metav1.CreateOptions{}) + _, err = clientset.RbacV1().Roles(namespace).Create(ctx, genRole(namespace), metav1.CreateOptions{}) if err != nil { log.Errorf("Creating Roles error: %s", err.Error()) return err @@ -112,23 +92,7 @@ func createOutboundPod(ctx context.Context, factory cmdutil.Factory, clientset * // 4) create roleBinding log.Infof("Creating RoleBinding %s", config.ConfigMapPodTrafficManager) - _, err = clientset.RbacV1().RoleBindings(namespace).Create(ctx, &rbacv1.RoleBinding{ - ObjectMeta: metav1.ObjectMeta{ - Name: config.ConfigMapPodTrafficManager, - Namespace: namespace, - }, - Subjects: []rbacv1.Subject{{ - Kind: "ServiceAccount", - //APIGroup: "rbac.authorization.k8s.io", - Name: config.ConfigMapPodTrafficManager, - Namespace: namespace, - }}, - RoleRef: rbacv1.RoleRef{ - APIGroup: "rbac.authorization.k8s.io", - Kind: "Role", - Name: config.ConfigMapPodTrafficManager, - }, - }, metav1.CreateOptions{}) + _, err = clientset.RbacV1().RoleBindings(namespace).Create(ctx, genRoleBinding(namespace), metav1.CreateOptions{}) if err != nil { log.Errorf("Creating RoleBinding error: %s", err.Error()) return err @@ -141,42 +105,8 @@ func createOutboundPod(ctx context.Context, factory cmdutil.Factory, clientset * tcp9002 := "9002-for-envoy" tcp80 := "80-for-webhook" udp53 := "53-for-dns" - _, err = clientset.CoreV1().Services(namespace).Create(ctx, &v1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: config.ConfigMapPodTrafficManager, - Namespace: namespace, - }, - Spec: v1.ServiceSpec{ - Ports: []v1.ServicePort{{ - Name: udp8422, - Protocol: v1.ProtocolUDP, - Port: 8422, - TargetPort: intstr.FromInt32(8422), - }, { - Name: tcp10800, - Protocol: v1.ProtocolTCP, - Port: 10800, - TargetPort: intstr.FromInt32(10800), - }, { - Name: tcp9002, - Protocol: v1.ProtocolTCP, - Port: 9002, - TargetPort: intstr.FromInt32(9002), - }, { - Name: tcp80, - Protocol: v1.ProtocolTCP, - Port: 80, - TargetPort: intstr.FromInt32(80), - }, { - Name: udp53, - Protocol: v1.ProtocolUDP, - Port: 53, - TargetPort: intstr.FromInt32(53), - }}, - Selector: map[string]string{"app": config.ConfigMapPodTrafficManager}, - Type: v1.ServiceTypeClusterIP, - }, - }, metav1.CreateOptions{}) + svcSpec := genService(namespace, udp8422, tcp10800, tcp9002, tcp80, udp53) + _, err = clientset.CoreV1().Services(namespace).Create(ctx, svcSpec, metav1.CreateOptions{}) if err != nil { log.Errorf("Creating Service error: %s", err.Error()) return err @@ -192,17 +122,7 @@ func createOutboundPod(ctx context.Context, factory cmdutil.Factory, clientset * // reason why not use v1.SecretTypeTls is because it needs key called tls.crt and tls.key, but tls.key can not as env variable // ➜ ~ export tls.key=a //export: not valid in this context: tls.key - secret := &v1.Secret{ - ObjectMeta: metav1.ObjectMeta{ - Name: config.ConfigMapPodTrafficManager, - Namespace: namespace, - }, - Data: map[string][]byte{ - config.TLSCertKey: crt, - config.TLSPrivateKeyKey: key, - }, - Type: v1.SecretTypeOpaque, - } + secret := genSecret(namespace, crt, key) _, err = clientset.CoreV1().Secrets(namespace).Create(ctx, secret, metav1.CreateOptions{}) if err != nil && !k8serrors.IsAlreadyExists(err) { log.Errorf("Creating secret error: %s", err.Error()) @@ -211,7 +131,71 @@ func createOutboundPod(ctx context.Context, factory cmdutil.Factory, clientset * // 6) create mutatingWebhookConfigurations log.Infof("Creating MutatingWebhookConfiguration %s", config.ConfigMapPodTrafficManager) - _, err = clientset.AdmissionregistrationV1().MutatingWebhookConfigurations().Create(ctx, &admissionv1.MutatingWebhookConfiguration{ + mutatingWebhookConfiguration := genMutatingWebhookConfiguration(namespace, crt) + _, err = clientset.AdmissionregistrationV1().MutatingWebhookConfigurations().Create(ctx, mutatingWebhookConfiguration, metav1.CreateOptions{}) + if err != nil && !k8serrors.IsForbidden(err) && !k8serrors.IsAlreadyExists(err) { + return fmt.Errorf("failed to create MutatingWebhookConfigurations: %v", err) + } + + // 7) create deployment + log.Infof("Creating Deployment %s", config.ConfigMapPodTrafficManager) + deploy := genDeploySpec(namespace, udp8422, tcp10800, tcp9002, udp53, tcp80, gvisor, imagePullSecretName) + deploy, err = clientset.AppsV1().Deployments(namespace).Create(ctx, deploy, metav1.CreateOptions{}) + if err != nil { + log.Errorf("Failed to create deployment for %s: %v", config.ConfigMapPodTrafficManager, err) + return err + } + + return waitPodReady(ctx, deploy, clientset.CoreV1().Pods(namespace)) +} + +func genServiceAccount(namespace string) *v1.ServiceAccount { + return &v1.ServiceAccount{ + ObjectMeta: metav1.ObjectMeta{ + Name: config.ConfigMapPodTrafficManager, + Namespace: namespace, + }, + AutomountServiceAccountToken: pointer.Bool(true), + } +} + +func genRole(namespace string) *rbacv1.Role { + return &rbacv1.Role{ + ObjectMeta: metav1.ObjectMeta{ + Name: config.ConfigMapPodTrafficManager, + Namespace: namespace, + }, + Rules: []rbacv1.PolicyRule{{ + Verbs: []string{"get", "list", "watch", "create", "update", "patch", "delete"}, + APIGroups: []string{""}, + Resources: []string{"configmaps", "secrets"}, + ResourceNames: []string{config.ConfigMapPodTrafficManager}, + }}, + } +} + +func genRoleBinding(namespace string) *rbacv1.RoleBinding { + return &rbacv1.RoleBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: config.ConfigMapPodTrafficManager, + Namespace: namespace, + }, + Subjects: []rbacv1.Subject{{ + Kind: "ServiceAccount", + //APIGroup: "rbac.authorization.k8s.io", + Name: config.ConfigMapPodTrafficManager, + Namespace: namespace, + }}, + RoleRef: rbacv1.RoleRef{ + APIGroup: "rbac.authorization.k8s.io", + Kind: "Role", + Name: config.ConfigMapPodTrafficManager, + }, + } +} + +func genMutatingWebhookConfiguration(namespace string, crt []byte) *admissionv1.MutatingWebhookConfiguration { + return &admissionv1.MutatingWebhookConfiguration{ ObjectMeta: metav1.ObjectMeta{ Name: config.ConfigMapPodTrafficManager + "." + namespace, Namespace: namespace, @@ -254,87 +238,64 @@ func createOutboundPod(ctx context.Context, factory cmdutil.Factory, clientset * }, },*/ }}, - }, metav1.CreateOptions{}) - if err != nil && !k8serrors.IsForbidden(err) && !k8serrors.IsAlreadyExists(err) { - return fmt.Errorf("failed to create MutatingWebhookConfigurations: %v", err) } - - // 7) create deployment - log.Infof("Creating Deployment %s", config.ConfigMapPodTrafficManager) - deploy := genDeploySpec(namespace, innerIpv4CIDR, innerIpv6CIDR, udp8422, tcp10800, tcp9002, udp53, tcp80, gvisor, imagePullSecretName) - deploy, err = clientset.AppsV1().Deployments(namespace).Create(ctx, deploy, metav1.CreateOptions{}) - if err != nil { - log.Errorf("Failed to create deployment for %s: %v", config.ConfigMapPodTrafficManager, err) - return err - } - str := fields.OneTermEqualSelector("app", config.ConfigMapPodTrafficManager).String() - _, selector, err := polymorphichelpers.SelectorsForObject(deploy) - if err == nil { - str = selector.String() - } - watchStream, err := clientset.CoreV1().Pods(namespace).Watch(ctx, metav1.ListOptions{ - LabelSelector: str, - }) - if err != nil { - log.Errorf("Failed to create watch for %s: %v", config.ConfigMapPodTrafficManager, err) - return err - } - defer watchStream.Stop() - var ok bool - var last string - ctx2, cancelFunc := context.WithTimeout(ctx, time.Minute*60) - defer cancelFunc() - log.Infoln() - wait.UntilWithContext(ctx2, func(ctx context.Context) { - podList, err := clientset.CoreV1().Pods(namespace).List(ctx2, metav1.ListOptions{ - LabelSelector: fields.OneTermEqualSelector("app", config.ConfigMapPodTrafficManager).String(), - }) - if err != nil { - log.Errorf("Failed to list pods for %s: %v", config.ConfigMapPodTrafficManager, err) - return - } - - for _, podT := range podList.Items { - podT := &podT - if podT.DeletionTimestamp != nil { - continue - } - var sb = bytes.NewBuffer(nil) - sb.WriteString(fmt.Sprintf("Pod %s is %s...\n", podT.Name, podT.Status.Phase)) - if podT.Status.Reason != "" { - sb.WriteString(fmt.Sprintf(" reason %s", podT.Status.Reason)) - } - if podT.Status.Message != "" { - sb.WriteString(fmt.Sprintf(" message %s", podT.Status.Message)) - } - util.PrintStatus(podT, sb) - if last != sb.String() { - log.Infof(sb.String()) - } - last = sb.String() - - if podutils.IsPodReady(podT) && func() bool { - for _, status := range podT.Status.ContainerStatuses { - if !status.Ready { - return false - } - } - return true - }() { - cancelFunc() - ok = true - } - } - }, time.Second*3) - if !ok { - log.Errorf("Wait pod %s to be ready timeout", config.ConfigMapPodTrafficManager) - return errors.New(fmt.Sprintf("wait pod %s to be ready timeout", config.ConfigMapPodTrafficManager)) - } - - return nil } -func genDeploySpec(namespace string, innerIpv4CIDR net.IPNet, innerIpv6CIDR net.IPNet, udp8422 string, tcp10800 string, tcp9002 string, udp53 string, tcp80 string, gvisor bool, imagePullSecretName string) *appsv1.Deployment { +func genService(namespace string, udp8422 string, tcp10800 string, tcp9002 string, tcp80 string, udp53 string) *v1.Service { + return &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: config.ConfigMapPodTrafficManager, + Namespace: namespace, + }, + Spec: v1.ServiceSpec{ + Ports: []v1.ServicePort{{ + Name: udp8422, + Protocol: v1.ProtocolUDP, + Port: 8422, + TargetPort: intstr.FromInt32(8422), + }, { + Name: tcp10800, + Protocol: v1.ProtocolTCP, + Port: 10800, + TargetPort: intstr.FromInt32(10800), + }, { + Name: tcp9002, + Protocol: v1.ProtocolTCP, + Port: 9002, + TargetPort: intstr.FromInt32(9002), + }, { + Name: tcp80, + Protocol: v1.ProtocolTCP, + Port: 80, + TargetPort: intstr.FromInt32(80), + }, { + Name: udp53, + Protocol: v1.ProtocolUDP, + Port: 53, + TargetPort: intstr.FromInt32(53), + }}, + Selector: map[string]string{"app": config.ConfigMapPodTrafficManager}, + Type: v1.ServiceTypeClusterIP, + }, + } +} + +func genSecret(namespace string, crt []byte, key []byte) *v1.Secret { + secret := &v1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: config.ConfigMapPodTrafficManager, + Namespace: namespace, + }, + Data: map[string][]byte{ + config.TLSCertKey: crt, + config.TLSPrivateKeyKey: key, + }, + Type: v1.SecretTypeOpaque, + } + return secret +} + +func genDeploySpec(namespace string, udp8422 string, tcp10800 string, tcp9002 string, udp53 string, tcp80 string, gvisor bool, imagePullSecretName string) *appsv1.Deployment { var resourcesSmall = v1.ResourceRequirements{ Requests: map[v1.ResourceName]resource.Quantity{ v1.ResourceCPU: resource.MustParse("100m"), @@ -356,6 +317,9 @@ func genDeploySpec(namespace string, innerIpv4CIDR net.IPNet, innerIpv6CIDR net. }, } + innerIpv4CIDR := net.IPNet{IP: config.RouterIP, Mask: config.CIDR.Mask} + innerIpv6CIDR := net.IPNet{IP: config.RouterIP6, Mask: config.CIDR6.Mask} + deploy := &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Name: config.ConfigMapPodTrafficManager, @@ -526,3 +490,63 @@ kubevpn serve -L "tcp://:10800" -L "tun://:8422?net=${TunIPv4}" -L "gtcp://:1080 } return deploy } + +func waitPodReady(ctx context.Context, deploy *appsv1.Deployment, clientset corev1.PodInterface) error { + _, selector, err := polymorphichelpers.SelectorsForObject(deploy) + if err != nil { + return err + } + var isPodReady bool + var lastMessage string + ctx2, cancelFunc := context.WithTimeout(ctx, time.Minute*60) + defer cancelFunc() + log.Infoln() + wait.UntilWithContext(ctx2, func(ctx context.Context) { + podList, err := clientset.List(ctx2, metav1.ListOptions{ + LabelSelector: selector.String(), + }) + if err != nil { + log.Errorf("Failed to list pods for %s: %v", deploy.Name, err) + return + } + + for _, pod := range podList.Items { + if pod.DeletionTimestamp != nil { + continue + } + var sb = bytes.NewBuffer(nil) + sb.WriteString(fmt.Sprintf("Pod %s is %s...\n", pod.Name, pod.Status.Phase)) + if pod.Status.Reason != "" { + sb.WriteString(fmt.Sprintf(" reason %s", pod.Status.Reason)) + } + if pod.Status.Message != "" { + sb.WriteString(fmt.Sprintf(" message %s", pod.Status.Message)) + } + util.PrintStatus(&pod, sb) + if lastMessage != sb.String() { + log.Infof(sb.String()) + } + lastMessage = sb.String() + + readyFunc := func(pod *v1.Pod) bool { + for _, status := range pod.Status.ContainerStatuses { + if !status.Ready { + return false + } + } + return true + } + if podutils.IsPodReady(&pod) && readyFunc(&pod) { + cancelFunc() + isPodReady = true + } + } + }, time.Second*3) + + if !isPodReady { + log.Errorf("Wait pod %s to be ready timeout", deploy.Name) + return errors.New(fmt.Sprintf("wait pod %s to be ready timeout", deploy.Name)) + } + + return nil +}