refactor: create outbound pod

This commit is contained in:
naison
2025-01-25 13:50:47 +00:00
parent 9a922ae084
commit 666a69cdfb

View File

@@ -16,10 +16,10 @@ import (
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/util/cert"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
"k8s.io/kubectl/pkg/polymorphichelpers"
@@ -32,9 +32,6 @@ import (
)
func createOutboundPod(ctx context.Context, factory cmdutil.Factory, clientset *kubernetes.Clientset, namespace string, gvisor bool, imagePullSecretName string) (err error) {
innerIpv4CIDR := net.IPNet{IP: config.RouterIP, Mask: config.CIDR.Mask}
innerIpv6CIDR := net.IPNet{IP: config.RouterIP6, Mask: config.CIDR6.Mask}
service, err := clientset.CoreV1().Services(namespace).Get(ctx, config.ConfigMapPodTrafficManager, metav1.GetOptions{})
if err == nil {
var pod *v1.Pod
@@ -79,13 +76,7 @@ func createOutboundPod(ctx context.Context, factory cmdutil.Factory, clientset *
// 2) create serviceAccount
log.Infof("Creating ServiceAccount %s", config.ConfigMapPodTrafficManager)
_, err = clientset.CoreV1().ServiceAccounts(namespace).Create(ctx, &v1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Name: config.ConfigMapPodTrafficManager,
Namespace: namespace,
},
AutomountServiceAccountToken: pointer.Bool(true),
}, metav1.CreateOptions{})
_, err = clientset.CoreV1().ServiceAccounts(namespace).Create(ctx, genServiceAccount(namespace), metav1.CreateOptions{})
if err != nil {
log.Infof("Creating ServiceAccount error: %s", err.Error())
return err
@@ -93,18 +84,7 @@ func createOutboundPod(ctx context.Context, factory cmdutil.Factory, clientset *
// 3) create roles
log.Infof("Creating Roles %s", config.ConfigMapPodTrafficManager)
_, err = clientset.RbacV1().Roles(namespace).Create(ctx, &rbacv1.Role{
ObjectMeta: metav1.ObjectMeta{
Name: config.ConfigMapPodTrafficManager,
Namespace: namespace,
},
Rules: []rbacv1.PolicyRule{{
Verbs: []string{"get", "list", "watch", "create", "update", "patch", "delete"},
APIGroups: []string{""},
Resources: []string{"configmaps", "secrets"},
ResourceNames: []string{config.ConfigMapPodTrafficManager},
}},
}, metav1.CreateOptions{})
_, err = clientset.RbacV1().Roles(namespace).Create(ctx, genRole(namespace), metav1.CreateOptions{})
if err != nil {
log.Errorf("Creating Roles error: %s", err.Error())
return err
@@ -112,23 +92,7 @@ func createOutboundPod(ctx context.Context, factory cmdutil.Factory, clientset *
// 4) create roleBinding
log.Infof("Creating RoleBinding %s", config.ConfigMapPodTrafficManager)
_, err = clientset.RbacV1().RoleBindings(namespace).Create(ctx, &rbacv1.RoleBinding{
ObjectMeta: metav1.ObjectMeta{
Name: config.ConfigMapPodTrafficManager,
Namespace: namespace,
},
Subjects: []rbacv1.Subject{{
Kind: "ServiceAccount",
//APIGroup: "rbac.authorization.k8s.io",
Name: config.ConfigMapPodTrafficManager,
Namespace: namespace,
}},
RoleRef: rbacv1.RoleRef{
APIGroup: "rbac.authorization.k8s.io",
Kind: "Role",
Name: config.ConfigMapPodTrafficManager,
},
}, metav1.CreateOptions{})
_, err = clientset.RbacV1().RoleBindings(namespace).Create(ctx, genRoleBinding(namespace), metav1.CreateOptions{})
if err != nil {
log.Errorf("Creating RoleBinding error: %s", err.Error())
return err
@@ -141,42 +105,8 @@ func createOutboundPod(ctx context.Context, factory cmdutil.Factory, clientset *
tcp9002 := "9002-for-envoy"
tcp80 := "80-for-webhook"
udp53 := "53-for-dns"
_, err = clientset.CoreV1().Services(namespace).Create(ctx, &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: config.ConfigMapPodTrafficManager,
Namespace: namespace,
},
Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{{
Name: udp8422,
Protocol: v1.ProtocolUDP,
Port: 8422,
TargetPort: intstr.FromInt32(8422),
}, {
Name: tcp10800,
Protocol: v1.ProtocolTCP,
Port: 10800,
TargetPort: intstr.FromInt32(10800),
}, {
Name: tcp9002,
Protocol: v1.ProtocolTCP,
Port: 9002,
TargetPort: intstr.FromInt32(9002),
}, {
Name: tcp80,
Protocol: v1.ProtocolTCP,
Port: 80,
TargetPort: intstr.FromInt32(80),
}, {
Name: udp53,
Protocol: v1.ProtocolUDP,
Port: 53,
TargetPort: intstr.FromInt32(53),
}},
Selector: map[string]string{"app": config.ConfigMapPodTrafficManager},
Type: v1.ServiceTypeClusterIP,
},
}, metav1.CreateOptions{})
svcSpec := genService(namespace, udp8422, tcp10800, tcp9002, tcp80, udp53)
_, err = clientset.CoreV1().Services(namespace).Create(ctx, svcSpec, metav1.CreateOptions{})
if err != nil {
log.Errorf("Creating Service error: %s", err.Error())
return err
@@ -192,17 +122,7 @@ func createOutboundPod(ctx context.Context, factory cmdutil.Factory, clientset *
// reason why not use v1.SecretTypeTls is because it needs key called tls.crt and tls.key, but tls.key can not as env variable
// ➜ ~ export tls.key=a
//export: not valid in this context: tls.key
secret := &v1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: config.ConfigMapPodTrafficManager,
Namespace: namespace,
},
Data: map[string][]byte{
config.TLSCertKey: crt,
config.TLSPrivateKeyKey: key,
},
Type: v1.SecretTypeOpaque,
}
secret := genSecret(namespace, crt, key)
_, err = clientset.CoreV1().Secrets(namespace).Create(ctx, secret, metav1.CreateOptions{})
if err != nil && !k8serrors.IsAlreadyExists(err) {
log.Errorf("Creating secret error: %s", err.Error())
@@ -211,7 +131,71 @@ func createOutboundPod(ctx context.Context, factory cmdutil.Factory, clientset *
// 6) create mutatingWebhookConfigurations
log.Infof("Creating MutatingWebhookConfiguration %s", config.ConfigMapPodTrafficManager)
_, err = clientset.AdmissionregistrationV1().MutatingWebhookConfigurations().Create(ctx, &admissionv1.MutatingWebhookConfiguration{
mutatingWebhookConfiguration := genMutatingWebhookConfiguration(namespace, crt)
_, err = clientset.AdmissionregistrationV1().MutatingWebhookConfigurations().Create(ctx, mutatingWebhookConfiguration, metav1.CreateOptions{})
if err != nil && !k8serrors.IsForbidden(err) && !k8serrors.IsAlreadyExists(err) {
return fmt.Errorf("failed to create MutatingWebhookConfigurations: %v", err)
}
// 7) create deployment
log.Infof("Creating Deployment %s", config.ConfigMapPodTrafficManager)
deploy := genDeploySpec(namespace, udp8422, tcp10800, tcp9002, udp53, tcp80, gvisor, imagePullSecretName)
deploy, err = clientset.AppsV1().Deployments(namespace).Create(ctx, deploy, metav1.CreateOptions{})
if err != nil {
log.Errorf("Failed to create deployment for %s: %v", config.ConfigMapPodTrafficManager, err)
return err
}
return waitPodReady(ctx, deploy, clientset.CoreV1().Pods(namespace))
}
func genServiceAccount(namespace string) *v1.ServiceAccount {
return &v1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Name: config.ConfigMapPodTrafficManager,
Namespace: namespace,
},
AutomountServiceAccountToken: pointer.Bool(true),
}
}
func genRole(namespace string) *rbacv1.Role {
return &rbacv1.Role{
ObjectMeta: metav1.ObjectMeta{
Name: config.ConfigMapPodTrafficManager,
Namespace: namespace,
},
Rules: []rbacv1.PolicyRule{{
Verbs: []string{"get", "list", "watch", "create", "update", "patch", "delete"},
APIGroups: []string{""},
Resources: []string{"configmaps", "secrets"},
ResourceNames: []string{config.ConfigMapPodTrafficManager},
}},
}
}
func genRoleBinding(namespace string) *rbacv1.RoleBinding {
return &rbacv1.RoleBinding{
ObjectMeta: metav1.ObjectMeta{
Name: config.ConfigMapPodTrafficManager,
Namespace: namespace,
},
Subjects: []rbacv1.Subject{{
Kind: "ServiceAccount",
//APIGroup: "rbac.authorization.k8s.io",
Name: config.ConfigMapPodTrafficManager,
Namespace: namespace,
}},
RoleRef: rbacv1.RoleRef{
APIGroup: "rbac.authorization.k8s.io",
Kind: "Role",
Name: config.ConfigMapPodTrafficManager,
},
}
}
func genMutatingWebhookConfiguration(namespace string, crt []byte) *admissionv1.MutatingWebhookConfiguration {
return &admissionv1.MutatingWebhookConfiguration{
ObjectMeta: metav1.ObjectMeta{
Name: config.ConfigMapPodTrafficManager + "." + namespace,
Namespace: namespace,
@@ -254,87 +238,64 @@ func createOutboundPod(ctx context.Context, factory cmdutil.Factory, clientset *
},
},*/
}},
}, metav1.CreateOptions{})
if err != nil && !k8serrors.IsForbidden(err) && !k8serrors.IsAlreadyExists(err) {
return fmt.Errorf("failed to create MutatingWebhookConfigurations: %v", err)
}
// 7) create deployment
log.Infof("Creating Deployment %s", config.ConfigMapPodTrafficManager)
deploy := genDeploySpec(namespace, innerIpv4CIDR, innerIpv6CIDR, udp8422, tcp10800, tcp9002, udp53, tcp80, gvisor, imagePullSecretName)
deploy, err = clientset.AppsV1().Deployments(namespace).Create(ctx, deploy, metav1.CreateOptions{})
if err != nil {
log.Errorf("Failed to create deployment for %s: %v", config.ConfigMapPodTrafficManager, err)
return err
}
str := fields.OneTermEqualSelector("app", config.ConfigMapPodTrafficManager).String()
_, selector, err := polymorphichelpers.SelectorsForObject(deploy)
if err == nil {
str = selector.String()
}
watchStream, err := clientset.CoreV1().Pods(namespace).Watch(ctx, metav1.ListOptions{
LabelSelector: str,
})
if err != nil {
log.Errorf("Failed to create watch for %s: %v", config.ConfigMapPodTrafficManager, err)
return err
}
defer watchStream.Stop()
var ok bool
var last string
ctx2, cancelFunc := context.WithTimeout(ctx, time.Minute*60)
defer cancelFunc()
log.Infoln()
wait.UntilWithContext(ctx2, func(ctx context.Context) {
podList, err := clientset.CoreV1().Pods(namespace).List(ctx2, metav1.ListOptions{
LabelSelector: fields.OneTermEqualSelector("app", config.ConfigMapPodTrafficManager).String(),
})
if err != nil {
log.Errorf("Failed to list pods for %s: %v", config.ConfigMapPodTrafficManager, err)
return
}
for _, podT := range podList.Items {
podT := &podT
if podT.DeletionTimestamp != nil {
continue
}
var sb = bytes.NewBuffer(nil)
sb.WriteString(fmt.Sprintf("Pod %s is %s...\n", podT.Name, podT.Status.Phase))
if podT.Status.Reason != "" {
sb.WriteString(fmt.Sprintf(" reason %s", podT.Status.Reason))
}
if podT.Status.Message != "" {
sb.WriteString(fmt.Sprintf(" message %s", podT.Status.Message))
}
util.PrintStatus(podT, sb)
if last != sb.String() {
log.Infof(sb.String())
}
last = sb.String()
if podutils.IsPodReady(podT) && func() bool {
for _, status := range podT.Status.ContainerStatuses {
if !status.Ready {
return false
}
}
return true
}() {
cancelFunc()
ok = true
}
}
}, time.Second*3)
if !ok {
log.Errorf("Wait pod %s to be ready timeout", config.ConfigMapPodTrafficManager)
return errors.New(fmt.Sprintf("wait pod %s to be ready timeout", config.ConfigMapPodTrafficManager))
}
return nil
}
func genDeploySpec(namespace string, innerIpv4CIDR net.IPNet, innerIpv6CIDR net.IPNet, udp8422 string, tcp10800 string, tcp9002 string, udp53 string, tcp80 string, gvisor bool, imagePullSecretName string) *appsv1.Deployment {
func genService(namespace string, udp8422 string, tcp10800 string, tcp9002 string, tcp80 string, udp53 string) *v1.Service {
return &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: config.ConfigMapPodTrafficManager,
Namespace: namespace,
},
Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{{
Name: udp8422,
Protocol: v1.ProtocolUDP,
Port: 8422,
TargetPort: intstr.FromInt32(8422),
}, {
Name: tcp10800,
Protocol: v1.ProtocolTCP,
Port: 10800,
TargetPort: intstr.FromInt32(10800),
}, {
Name: tcp9002,
Protocol: v1.ProtocolTCP,
Port: 9002,
TargetPort: intstr.FromInt32(9002),
}, {
Name: tcp80,
Protocol: v1.ProtocolTCP,
Port: 80,
TargetPort: intstr.FromInt32(80),
}, {
Name: udp53,
Protocol: v1.ProtocolUDP,
Port: 53,
TargetPort: intstr.FromInt32(53),
}},
Selector: map[string]string{"app": config.ConfigMapPodTrafficManager},
Type: v1.ServiceTypeClusterIP,
},
}
}
func genSecret(namespace string, crt []byte, key []byte) *v1.Secret {
secret := &v1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: config.ConfigMapPodTrafficManager,
Namespace: namespace,
},
Data: map[string][]byte{
config.TLSCertKey: crt,
config.TLSPrivateKeyKey: key,
},
Type: v1.SecretTypeOpaque,
}
return secret
}
func genDeploySpec(namespace string, udp8422 string, tcp10800 string, tcp9002 string, udp53 string, tcp80 string, gvisor bool, imagePullSecretName string) *appsv1.Deployment {
var resourcesSmall = v1.ResourceRequirements{
Requests: map[v1.ResourceName]resource.Quantity{
v1.ResourceCPU: resource.MustParse("100m"),
@@ -356,6 +317,9 @@ func genDeploySpec(namespace string, innerIpv4CIDR net.IPNet, innerIpv6CIDR net.
},
}
innerIpv4CIDR := net.IPNet{IP: config.RouterIP, Mask: config.CIDR.Mask}
innerIpv6CIDR := net.IPNet{IP: config.RouterIP6, Mask: config.CIDR6.Mask}
deploy := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: config.ConfigMapPodTrafficManager,
@@ -526,3 +490,63 @@ kubevpn serve -L "tcp://:10800" -L "tun://:8422?net=${TunIPv4}" -L "gtcp://:1080
}
return deploy
}
func waitPodReady(ctx context.Context, deploy *appsv1.Deployment, clientset corev1.PodInterface) error {
_, selector, err := polymorphichelpers.SelectorsForObject(deploy)
if err != nil {
return err
}
var isPodReady bool
var lastMessage string
ctx2, cancelFunc := context.WithTimeout(ctx, time.Minute*60)
defer cancelFunc()
log.Infoln()
wait.UntilWithContext(ctx2, func(ctx context.Context) {
podList, err := clientset.List(ctx2, metav1.ListOptions{
LabelSelector: selector.String(),
})
if err != nil {
log.Errorf("Failed to list pods for %s: %v", deploy.Name, err)
return
}
for _, pod := range podList.Items {
if pod.DeletionTimestamp != nil {
continue
}
var sb = bytes.NewBuffer(nil)
sb.WriteString(fmt.Sprintf("Pod %s is %s...\n", pod.Name, pod.Status.Phase))
if pod.Status.Reason != "" {
sb.WriteString(fmt.Sprintf(" reason %s", pod.Status.Reason))
}
if pod.Status.Message != "" {
sb.WriteString(fmt.Sprintf(" message %s", pod.Status.Message))
}
util.PrintStatus(&pod, sb)
if lastMessage != sb.String() {
log.Infof(sb.String())
}
lastMessage = sb.String()
readyFunc := func(pod *v1.Pod) bool {
for _, status := range pod.Status.ContainerStatuses {
if !status.Ready {
return false
}
}
return true
}
if podutils.IsPodReady(&pod) && readyFunc(&pod) {
cancelFunc()
isPodReady = true
}
}
}, time.Second*3)
if !isPodReady {
log.Errorf("Wait pod %s to be ready timeout", deploy.Name)
return errors.New(fmt.Sprintf("wait pod %s to be ready timeout", deploy.Name))
}
return nil
}