support service and crd, but single pod without controller still have bug

This commit is contained in:
p_caiwfeng
2022-01-26 17:23:29 +08:00
parent df217a4b79
commit 961bb954bf
5 changed files with 100 additions and 35 deletions

View File

@@ -11,7 +11,6 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/retry"
"net"
"os"
"os/signal"
"strconv"
@@ -22,14 +21,14 @@ var stopChan = make(chan os.Signal)
var rollbackFuncList = make([]func(), 2)
var ctx, cancel = context.WithCancel(context.TODO())
func AddCleanUpResourceHandler(clientset *kubernetes.Clientset, namespace string, manager *DHCPManager, ip ...*net.IPNet) {
func (c *ConnectOptions) addCleanUpResourceHandler(clientset *kubernetes.Clientset, namespace string) {
signal.Notify(stopChan, os.Interrupt, os.Kill, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL /*, syscall.SIGSTOP*/)
go func() {
<-stopChan
log.Info("prepare to exit, cleaning up")
dns.CancelDNS()
for _, ipNet := range ip {
if err := manager.ReleaseIpToDHCP(ipNet); err != nil {
for _, ipNet := range c.usedIPs {
if err := c.dhcp.ReleaseIpToDHCP(ipNet); err != nil {
log.Errorf("failed to release ip to dhcp, err: %v", err)
}
}

View File

@@ -17,7 +17,6 @@ import (
"k8s.io/kubectl/pkg/polymorphichelpers"
"net"
"strings"
"sync"
"time"
)
@@ -40,8 +39,10 @@ type ConnectOptions struct {
factory cmdutil.Factory
cidrs []*net.IPNet
dhcp *DHCPManager
routerIP net.IP
localTunIP *net.IPNet
// needs to give it back to dhcp
usedIPs []*net.IPNet
routerIP net.IP
localTunIP *net.IPNet
}
func (c *ConnectOptions) createRemoteInboundPod() (err error) {
@@ -51,17 +52,18 @@ func (c *ConnectOptions) createRemoteInboundPod() (err error) {
}
tempIps := []*net.IPNet{c.localTunIP}
wg := sync.WaitGroup{}
lock := sync.Mutex{}
//wg := &sync.WaitGroup{}
//lock := &sync.Mutex{}
for _, workload := range c.Workloads {
if len(workload) > 0 {
wg.Add(1)
/*go*/ func(finalWorkload string) {
defer wg.Done()
lock.Lock()
//wg.Add(1)
/*go*/
func(finalWorkload string) {
//defer wg.Done()
//lock.Lock()
virtualShadowIp, _ := c.dhcp.RentIPRandom()
tempIps = append(tempIps, virtualShadowIp)
lock.Unlock()
//lock.Unlock()
config := util.PodRouteConfig{
LocalTunIP: c.localTunIP.IP.String(),
InboundPodTunIP: virtualShadowIp.String(),
@@ -80,12 +82,13 @@ func (c *ConnectOptions) createRemoteInboundPod() (err error) {
}(workload)
}
}
wg.Wait()
AddCleanUpResourceHandler(c.clientset, c.Namespace, c.dhcp, tempIps...)
//wg.Wait()
c.usedIPs = tempIps
return
}
func (c *ConnectOptions) DoConnect() (err error) {
c.addCleanUpResourceHandler(c.clientset, c.Namespace)
c.cidrs, err = getCIDR(c.clientset, c.Namespace)
if err != nil {
return
@@ -306,7 +309,7 @@ func (c *ConnectOptions) PreCheckResource() {
for i, workload := range c.Workloads {
ownerReference, err := util.GetTopOwnerReference(c.factory, c.Namespace, workload)
if err == nil {
c.Workloads[i] = fmt.Sprintf("%s/%s", ownerReference.Mapping.Resource.Resource, ownerReference.Name)
c.Workloads[i] = fmt.Sprintf("%s/%s", ownerReference.Mapping.GroupVersionKind.GroupKind().String(), ownerReference.Name)
}
}
// service which associate with pod
@@ -330,7 +333,7 @@ func (c *ConnectOptions) PreCheckResource() {
if err == nil && list != nil && len(list.Items) != 0 {
ownerReference, err := util.GetTopOwnerReference(c.factory, c.Namespace, fmt.Sprintf("%s/%s", "pods", list.Items[0].Name))
if err == nil {
c.Workloads[i] = fmt.Sprintf("%s/%s", ownerReference.Mapping.Resource.Resource, ownerReference.Name)
c.Workloads[i] = fmt.Sprintf("%s/%s", ownerReference.Mapping.GroupVersionKind.GroupKind().String(), ownerReference.Name)
}
} else
// if list is empty, means not create pods, just controllers

View File

@@ -9,13 +9,17 @@ import (
"github.com/wencaiwulue/kubevpn/pkg/exchange"
"github.com/wencaiwulue/kubevpn/util"
v1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
pkgresource "k8s.io/cli-runtime/pkg/resource"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/retry"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
"k8s.io/kubectl/pkg/polymorphichelpers"
"k8s.io/kubectl/pkg/util/podutils"
@@ -94,19 +98,22 @@ func CreateOutboundRouterPod(clientset *kubernetes.Clientset, namespace string,
PriorityClassName: "system-cluster-critical",
},
}
_, err = clientset.CoreV1().Pods(namespace).Create(context.TODO(), manager, metav1.CreateOptions{})
newPod, err := clientset.CoreV1().Pods(namespace).Create(context.TODO(), manager, metav1.CreateOptions{})
if err != nil {
return nil, err
}
watch, err := clientset.CoreV1().Pods(namespace).Watch(context.TODO(), metav1.SingleObject(metav1.ObjectMeta{Name: manager.GetName()}))
if newPod.Status.Phase == v1.PodRunning {
return net.ParseIP(newPod.Status.PodIP), nil
}
watchStream, err := clientset.CoreV1().Pods(namespace).Watch(context.TODO(), metav1.SingleObject(metav1.ObjectMeta{Name: manager.GetName()}))
if err != nil {
return nil, err
}
defer watch.Stop()
defer watchStream.Stop()
var phase v1.PodPhase
for {
select {
case e := <-watch.ResultChan():
case e := <-watchStream.ResultChan():
if podT, ok := e.Object.(*v1.Pod); ok {
if phase != podT.Status.Phase {
log.Infof("pod %s status is %s", util.TrafficManager, podT.Status.Phase)
@@ -164,26 +171,53 @@ func CreateInboundPod(factory cmdutil.Factory, namespace, workloads string, conf
}); err != nil {
return err
}
// wait for api-server to delete this pods
<-time.Tick(time.Second * 2)
if single, err := helper.WatchSingle(object.Namespace, object.Name, object.ResourceVersion); err == nil {
out:
for {
select {
case e, ok := <-single.ResultChan():
if ok {
if e.Type == watch.Deleted {
single.Stop()
break out
}
}
}
}
}
//// wait for api-server to delete this pods
//<-time.Tick(time.Second * 3)
podTempSpec.Spec.PriorityClassName = ""
if _, err = helper.Create(object.Namespace, true, &v1.Pod{
ObjectMeta: podTempSpec.ObjectMeta, Spec: podTempSpec.Spec,
p := &v1.Pod{ObjectMeta: podTempSpec.ObjectMeta, Spec: podTempSpec.Spec}
CleanupUselessInfo(p)
if err = retry.OnError(wait.Backoff{
Steps: 10,
Duration: 50 * time.Millisecond,
Factor: 5.0,
Jitter: 1,
}, func(err error) bool {
return !(k8serrors.IsAlreadyExists(err) && !strings.Contains(err.Error(), "object is being deleted"))
}, func() error {
if _, err = helper.Create(object.Namespace, true, p); err != nil {
return err
}
return errors.New("")
}); err != nil {
log.Error(err)
return err
}
rollbackFuncList = append(rollbackFuncList, func() {
if _, err = helper.DeleteWithOptions(object.Namespace, object.Name, &metav1.DeleteOptions{
GracePeriodSeconds: &zero,
}); err != nil {
log.Error(err)
}
// wait for api-server to delete this pods
<-time.Tick(time.Second * 2)
if _, err = helper.Create(object.Namespace, true, &v1.Pod{
ObjectMeta: origin.ObjectMeta, Spec: origin.Spec,
}); err != nil {
//// wait for api-server to delete this pods
//<-time.Tick(time.Second * 2)
p2 := &v1.Pod{ObjectMeta: origin.ObjectMeta, Spec: origin.Spec}
CleanupUselessInfo(p2)
if _, err = helper.Create(object.Namespace, true, p2); err != nil {
log.Error(err)
}
})
@@ -245,3 +279,14 @@ func RemoveInboundPod(factory cmdutil.Factory, namespace, workloads string) erro
})
return err
}
func CleanupUselessInfo(pod *v1.Pod) {
pod.SetSelfLink("")
pod.SetGeneration(0)
pod.SetResourceVersion("")
pod.SetUID("")
pod.SetDeletionTimestamp(nil)
pod.SetSelfLink("")
pod.SetManagedFields(nil)
pod.SetOwnerReferences(nil)
}

View File

@@ -149,6 +149,17 @@ func TestBackoff(t *testing.T) {
})
}
func TestGetCRD(t *testing.T) {
join := filepath.Join(homedir.HomeDir(), ".kube", "nocalhost.large")
configFlags := genericclioptions.NewConfigFlags(true).WithDeprecatedPasswordFlag()
configFlags.KubeConfig = &join
factory := cmdutil.NewFactory(cmdutil.NewMatchVersionFlags(configFlags))
Namespace, _, _ := factory.ToRawKubeConfigLoader().Namespace()
object, err := util.GetUnstructuredObject(factory, Namespace, "statefulsets.apps.kruise.io/sample-beta1")
fmt.Println(object)
fmt.Println(err)
}
func TestDeleteAndCreate(t *testing.T) {
configFlags := genericclioptions.NewConfigFlags(true).WithDeprecatedPasswordFlag()
configFlags.KubeConfig = &clientcmd.RecommendedHomeFile
@@ -158,7 +169,7 @@ func TestDeleteAndCreate(t *testing.T) {
//restclient, err := factory.RESTClient()
//clientset, err := factory.KubernetesClientSet()
Namespace, _, err := factory.ToRawKubeConfigLoader().Namespace()
object, err := util.GetUnstructuredObject(factory, Namespace, "pods/nginx")
object, err := util.GetUnstructuredObject(factory, Namespace, "statefulsets.apps.kruise.io/sample-beta1")
u := object.Object.(*unstructured.Unstructured)
var pp v1.Pod

View File

@@ -120,7 +120,14 @@ func GetTopOwnerReference(factory cmdutil.Factory, namespace, workload string) (
if ownerReference == nil {
return object, nil
}
workload = fmt.Sprintf("%s/%s", ownerReference.Kind, ownerReference.Name)
// apiVersion format is Group/Version is like: apps/v1, apps.kruise.io/v1beta1
// we need to trans it to Kind.Group
split := strings.Split(ownerReference.APIVersion, "/")
gk := metav1.GroupKind{
Group: split[0],
Kind: ownerReference.Kind,
}
workload = fmt.Sprintf("%s/%s", gk.String(), ownerReference.Name)
}
}
@@ -132,9 +139,9 @@ func GetTopOwnerReferenceBySelector(factory cmdutil.Factory, namespace, selector
}
set := sets.NewString()
for _, info := range object {
reference, err := GetTopOwnerReference(factory, namespace, fmt.Sprintf("%s/%s", info.Mapping.Resource.Resource, info.Name))
reference, err := GetTopOwnerReference(factory, namespace, fmt.Sprintf("%s/%s", info.Mapping.Resource.GroupResource().String(), info.Name))
if err == nil && reference.Mapping.Resource.Resource != "services" {
set.Insert(fmt.Sprintf("%s/%s", reference.Mapping.Resource.Resource, reference.Name))
set.Insert(fmt.Sprintf("%s/%s", reference.Mapping.GroupVersionKind.GroupKind().String(), reference.Name))
}
}
return set, nil