diff --git a/pkg/cleaner.go b/pkg/cleaner.go index e1d987e8..1eda8b63 100644 --- a/pkg/cleaner.go +++ b/pkg/cleaner.go @@ -11,7 +11,6 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" "k8s.io/client-go/util/retry" - "net" "os" "os/signal" "strconv" @@ -22,14 +21,14 @@ var stopChan = make(chan os.Signal) var rollbackFuncList = make([]func(), 2) var ctx, cancel = context.WithCancel(context.TODO()) -func AddCleanUpResourceHandler(clientset *kubernetes.Clientset, namespace string, manager *DHCPManager, ip ...*net.IPNet) { +func (c *ConnectOptions) addCleanUpResourceHandler(clientset *kubernetes.Clientset, namespace string) { signal.Notify(stopChan, os.Interrupt, os.Kill, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL /*, syscall.SIGSTOP*/) go func() { <-stopChan log.Info("prepare to exit, cleaning up") dns.CancelDNS() - for _, ipNet := range ip { - if err := manager.ReleaseIpToDHCP(ipNet); err != nil { + for _, ipNet := range c.usedIPs { + if err := c.dhcp.ReleaseIpToDHCP(ipNet); err != nil { log.Errorf("failed to release ip to dhcp, err: %v", err) } } diff --git a/pkg/connect.go b/pkg/connect.go index 51caa8e2..d89eb0c9 100644 --- a/pkg/connect.go +++ b/pkg/connect.go @@ -17,7 +17,6 @@ import ( "k8s.io/kubectl/pkg/polymorphichelpers" "net" "strings" - "sync" "time" ) @@ -40,8 +39,10 @@ type ConnectOptions struct { factory cmdutil.Factory cidrs []*net.IPNet dhcp *DHCPManager - routerIP net.IP - localTunIP *net.IPNet + // needs to give it back to dhcp + usedIPs []*net.IPNet + routerIP net.IP + localTunIP *net.IPNet } func (c *ConnectOptions) createRemoteInboundPod() (err error) { @@ -51,17 +52,18 @@ func (c *ConnectOptions) createRemoteInboundPod() (err error) { } tempIps := []*net.IPNet{c.localTunIP} - wg := sync.WaitGroup{} - lock := sync.Mutex{} + //wg := &sync.WaitGroup{} + //lock := &sync.Mutex{} for _, workload := range c.Workloads { if len(workload) > 0 { - wg.Add(1) - /*go*/ func(finalWorkload string) { - defer wg.Done() - lock.Lock() + //wg.Add(1) + /*go*/ + func(finalWorkload string) { + //defer wg.Done() + //lock.Lock() virtualShadowIp, _ := c.dhcp.RentIPRandom() tempIps = append(tempIps, virtualShadowIp) - lock.Unlock() + //lock.Unlock() config := util.PodRouteConfig{ LocalTunIP: c.localTunIP.IP.String(), InboundPodTunIP: virtualShadowIp.String(), @@ -80,12 +82,13 @@ func (c *ConnectOptions) createRemoteInboundPod() (err error) { }(workload) } } - wg.Wait() - AddCleanUpResourceHandler(c.clientset, c.Namespace, c.dhcp, tempIps...) + //wg.Wait() + c.usedIPs = tempIps return } func (c *ConnectOptions) DoConnect() (err error) { + c.addCleanUpResourceHandler(c.clientset, c.Namespace) c.cidrs, err = getCIDR(c.clientset, c.Namespace) if err != nil { return @@ -306,7 +309,7 @@ func (c *ConnectOptions) PreCheckResource() { for i, workload := range c.Workloads { ownerReference, err := util.GetTopOwnerReference(c.factory, c.Namespace, workload) if err == nil { - c.Workloads[i] = fmt.Sprintf("%s/%s", ownerReference.Mapping.Resource.Resource, ownerReference.Name) + c.Workloads[i] = fmt.Sprintf("%s/%s", ownerReference.Mapping.GroupVersionKind.GroupKind().String(), ownerReference.Name) } } // service which associate with pod @@ -330,7 +333,7 @@ func (c *ConnectOptions) PreCheckResource() { if err == nil && list != nil && len(list.Items) != 0 { ownerReference, err := util.GetTopOwnerReference(c.factory, c.Namespace, fmt.Sprintf("%s/%s", "pods", list.Items[0].Name)) if err == nil { - c.Workloads[i] = fmt.Sprintf("%s/%s", ownerReference.Mapping.Resource.Resource, ownerReference.Name) + c.Workloads[i] = fmt.Sprintf("%s/%s", ownerReference.Mapping.GroupVersionKind.GroupKind().String(), ownerReference.Name) } } else // if list is empty, means not create pods, just controllers diff --git a/pkg/remote.go b/pkg/remote.go index 1cebe193..7c7c174d 100644 --- a/pkg/remote.go +++ b/pkg/remote.go @@ -9,13 +9,17 @@ import ( "github.com/wencaiwulue/kubevpn/pkg/exchange" "github.com/wencaiwulue/kubevpn/util" v1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" pkgresource "k8s.io/cli-runtime/pkg/resource" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/util/retry" cmdutil "k8s.io/kubectl/pkg/cmd/util" "k8s.io/kubectl/pkg/polymorphichelpers" "k8s.io/kubectl/pkg/util/podutils" @@ -94,19 +98,22 @@ func CreateOutboundRouterPod(clientset *kubernetes.Clientset, namespace string, PriorityClassName: "system-cluster-critical", }, } - _, err = clientset.CoreV1().Pods(namespace).Create(context.TODO(), manager, metav1.CreateOptions{}) + newPod, err := clientset.CoreV1().Pods(namespace).Create(context.TODO(), manager, metav1.CreateOptions{}) if err != nil { return nil, err } - watch, err := clientset.CoreV1().Pods(namespace).Watch(context.TODO(), metav1.SingleObject(metav1.ObjectMeta{Name: manager.GetName()})) + if newPod.Status.Phase == v1.PodRunning { + return net.ParseIP(newPod.Status.PodIP), nil + } + watchStream, err := clientset.CoreV1().Pods(namespace).Watch(context.TODO(), metav1.SingleObject(metav1.ObjectMeta{Name: manager.GetName()})) if err != nil { return nil, err } - defer watch.Stop() + defer watchStream.Stop() var phase v1.PodPhase for { select { - case e := <-watch.ResultChan(): + case e := <-watchStream.ResultChan(): if podT, ok := e.Object.(*v1.Pod); ok { if phase != podT.Status.Phase { log.Infof("pod %s status is %s", util.TrafficManager, podT.Status.Phase) @@ -164,26 +171,53 @@ func CreateInboundPod(factory cmdutil.Factory, namespace, workloads string, conf }); err != nil { return err } - // wait for api-server to delete this pods - <-time.Tick(time.Second * 2) + if single, err := helper.WatchSingle(object.Namespace, object.Name, object.ResourceVersion); err == nil { + out: + for { + select { + case e, ok := <-single.ResultChan(): + if ok { + if e.Type == watch.Deleted { + single.Stop() + break out + } + } + } + } + } + //// wait for api-server to delete this pods + //<-time.Tick(time.Second * 3) podTempSpec.Spec.PriorityClassName = "" - if _, err = helper.Create(object.Namespace, true, &v1.Pod{ - ObjectMeta: podTempSpec.ObjectMeta, Spec: podTempSpec.Spec, + p := &v1.Pod{ObjectMeta: podTempSpec.ObjectMeta, Spec: podTempSpec.Spec} + CleanupUselessInfo(p) + if err = retry.OnError(wait.Backoff{ + Steps: 10, + Duration: 50 * time.Millisecond, + Factor: 5.0, + Jitter: 1, + }, func(err error) bool { + return !(k8serrors.IsAlreadyExists(err) && !strings.Contains(err.Error(), "object is being deleted")) + }, func() error { + if _, err = helper.Create(object.Namespace, true, p); err != nil { + return err + } + return errors.New("") }); err != nil { log.Error(err) return err } + rollbackFuncList = append(rollbackFuncList, func() { if _, err = helper.DeleteWithOptions(object.Namespace, object.Name, &metav1.DeleteOptions{ GracePeriodSeconds: &zero, }); err != nil { log.Error(err) } - // wait for api-server to delete this pods - <-time.Tick(time.Second * 2) - if _, err = helper.Create(object.Namespace, true, &v1.Pod{ - ObjectMeta: origin.ObjectMeta, Spec: origin.Spec, - }); err != nil { + //// wait for api-server to delete this pods + //<-time.Tick(time.Second * 2) + p2 := &v1.Pod{ObjectMeta: origin.ObjectMeta, Spec: origin.Spec} + CleanupUselessInfo(p2) + if _, err = helper.Create(object.Namespace, true, p2); err != nil { log.Error(err) } }) @@ -245,3 +279,14 @@ func RemoveInboundPod(factory cmdutil.Factory, namespace, workloads string) erro }) return err } + +func CleanupUselessInfo(pod *v1.Pod) { + pod.SetSelfLink("") + pod.SetGeneration(0) + pod.SetResourceVersion("") + pod.SetUID("") + pod.SetDeletionTimestamp(nil) + pod.SetSelfLink("") + pod.SetManagedFields(nil) + pod.SetOwnerReferences(nil) +} diff --git a/pkg/remote_test.go b/pkg/remote_test.go index 601d1cfe..25f2cfcf 100644 --- a/pkg/remote_test.go +++ b/pkg/remote_test.go @@ -149,6 +149,17 @@ func TestBackoff(t *testing.T) { }) } +func TestGetCRD(t *testing.T) { + join := filepath.Join(homedir.HomeDir(), ".kube", "nocalhost.large") + configFlags := genericclioptions.NewConfigFlags(true).WithDeprecatedPasswordFlag() + configFlags.KubeConfig = &join + factory := cmdutil.NewFactory(cmdutil.NewMatchVersionFlags(configFlags)) + Namespace, _, _ := factory.ToRawKubeConfigLoader().Namespace() + object, err := util.GetUnstructuredObject(factory, Namespace, "statefulsets.apps.kruise.io/sample-beta1") + fmt.Println(object) + fmt.Println(err) +} + func TestDeleteAndCreate(t *testing.T) { configFlags := genericclioptions.NewConfigFlags(true).WithDeprecatedPasswordFlag() configFlags.KubeConfig = &clientcmd.RecommendedHomeFile @@ -158,7 +169,7 @@ func TestDeleteAndCreate(t *testing.T) { //restclient, err := factory.RESTClient() //clientset, err := factory.KubernetesClientSet() Namespace, _, err := factory.ToRawKubeConfigLoader().Namespace() - object, err := util.GetUnstructuredObject(factory, Namespace, "pods/nginx") + object, err := util.GetUnstructuredObject(factory, Namespace, "statefulsets.apps.kruise.io/sample-beta1") u := object.Object.(*unstructured.Unstructured) var pp v1.Pod diff --git a/util/util.go b/util/util.go index 72e79908..76c961ba 100644 --- a/util/util.go +++ b/util/util.go @@ -120,7 +120,14 @@ func GetTopOwnerReference(factory cmdutil.Factory, namespace, workload string) ( if ownerReference == nil { return object, nil } - workload = fmt.Sprintf("%s/%s", ownerReference.Kind, ownerReference.Name) + // apiVersion format is Group/Version is like: apps/v1, apps.kruise.io/v1beta1 + // we need to trans it to Kind.Group + split := strings.Split(ownerReference.APIVersion, "/") + gk := metav1.GroupKind{ + Group: split[0], + Kind: ownerReference.Kind, + } + workload = fmt.Sprintf("%s/%s", gk.String(), ownerReference.Name) } } @@ -132,9 +139,9 @@ func GetTopOwnerReferenceBySelector(factory cmdutil.Factory, namespace, selector } set := sets.NewString() for _, info := range object { - reference, err := GetTopOwnerReference(factory, namespace, fmt.Sprintf("%s/%s", info.Mapping.Resource.Resource, info.Name)) + reference, err := GetTopOwnerReference(factory, namespace, fmt.Sprintf("%s/%s", info.Mapping.Resource.GroupResource().String(), info.Name)) if err == nil && reference.Mapping.Resource.Resource != "services" { - set.Insert(fmt.Sprintf("%s/%s", reference.Mapping.Resource.Resource, reference.Name)) + set.Insert(fmt.Sprintf("%s/%s", reference.Mapping.GroupVersionKind.GroupKind().String(), reference.Name)) } } return set, nil