feat: patch once and restore probe by last user exit

This commit is contained in:
wencaiwulue
2022-11-03 21:18:47 +08:00
committed by wencaiwulue
parent 2066e313b1
commit cad0f586f6
12 changed files with 289 additions and 225 deletions

View File

@@ -24,7 +24,6 @@ func init() {
connectCmd.Flags().StringVar(&connect.KubeconfigPath, "kubeconfig", clientcmd.RecommendedHomeFile, "kubeconfig") connectCmd.Flags().StringVar(&connect.KubeconfigPath, "kubeconfig", clientcmd.RecommendedHomeFile, "kubeconfig")
connectCmd.Flags().StringVarP(&connect.Namespace, "namespace", "n", "", "namespace") connectCmd.Flags().StringVarP(&connect.Namespace, "namespace", "n", "", "namespace")
connectCmd.PersistentFlags().StringArrayVar(&connect.Workloads, "workloads", []string{}, "workloads, like: pods/tomcat, deployment/nginx, replicaset/tomcat...") connectCmd.PersistentFlags().StringArrayVar(&connect.Workloads, "workloads", []string{}, "workloads, like: pods/tomcat, deployment/nginx, replicaset/tomcat...")
connectCmd.Flags().StringVar((*string)(&connect.Mode), "mode", string(handler.Reverse), "mode(reverse/mesh), reverse: proxy all traffic into local, mesh: proxy traffic with special headers into local")
connectCmd.Flags().StringToStringVarP(&connect.Headers, "headers", "H", map[string]string{}, "headers, format is k=v, like: k1=v1,k2=v2") connectCmd.Flags().StringToStringVarP(&connect.Headers, "headers", "H", map[string]string{}, "headers, format is k=v, like: k1=v1,k2=v2")
connectCmd.Flags().BoolVar(&config.Debug, "debug", false, "true/false") connectCmd.Flags().BoolVar(&config.Debug, "debug", false, "true/false")
RootCmd.AddCommand(connectCmd) RootCmd.AddCommand(connectCmd)

View File

@@ -30,7 +30,8 @@ var ServerCmd = &cobra.Command{
go func() { log.Info(http.ListenAndServe("localhost:6060", nil)) }() go func() { log.Info(http.ListenAndServe("localhost:6060", nil)) }()
}, },
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
if err := handler.Start(context.TODO(), route); err != nil { err := handler.Start(context.TODO(), route)
if err != nil {
log.Fatal(err) log.Fatal(err)
} }
select {} select {}

View File

@@ -2,6 +2,7 @@ package controlplane
import ( import (
"log" "log"
"time"
"github.com/fsnotify/fsnotify" "github.com/fsnotify/fsnotify"
) )
@@ -25,44 +26,45 @@ func Watch(directory string, notifyCh chan<- NotifyMessage) {
log.Fatal(err) log.Fatal(err)
} }
defer watcher.Close() defer watcher.Close()
done := make(chan bool)
go func() {
for {
select {
case event, ok := <-watcher.Events:
if !ok {
return
}
if event.Op&fsnotify.Write == fsnotify.Write {
notifyCh <- NotifyMessage{
Operation: Modify,
FilePath: event.Name,
}
} else if event.Op&fsnotify.Create == fsnotify.Create {
notifyCh <- NotifyMessage{
Operation: Create,
FilePath: event.Name,
}
} else if event.Op&fsnotify.Remove == fsnotify.Remove {
notifyCh <- NotifyMessage{
Operation: Remove,
FilePath: event.Name,
}
}
case err, ok := <-watcher.Errors:
if !ok {
return
}
log.Println("error:", err)
}
}
}()
err = watcher.Add(directory) err = watcher.Add(directory)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
<-done
for {
select {
case event, ok := <-watcher.Events:
if !ok {
return
}
if event.Op&fsnotify.Write == fsnotify.Write {
notifyCh <- NotifyMessage{
Operation: Modify,
FilePath: event.Name,
}
} else if event.Op&fsnotify.Create == fsnotify.Create {
notifyCh <- NotifyMessage{
Operation: Create,
FilePath: event.Name,
}
} else if event.Op&fsnotify.Remove == fsnotify.Remove {
notifyCh <- NotifyMessage{
Operation: Remove,
FilePath: event.Name,
}
}
case err, ok := <-watcher.Errors:
if !ok {
return
}
log.Println("error:", err)
case <-time.Tick(time.Second * 3):
notifyCh <- NotifyMessage{
Operation: Remove,
FilePath: directory,
}
}
}
} }

View File

@@ -17,6 +17,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
@@ -31,17 +32,9 @@ import (
"github.com/wencaiwulue/kubevpn/pkg/util" "github.com/wencaiwulue/kubevpn/pkg/util"
) )
type Mode string
const (
Mesh Mode = "mesh"
Reverse Mode = "reverse"
)
type ConnectOptions struct { type ConnectOptions struct {
KubeconfigPath string KubeconfigPath string
Namespace string Namespace string
Mode Mode
Headers map[string]string Headers map[string]string
Workloads []string Workloads []string
clientset *kubernetes.Clientset clientset *kubernetes.Clientset
@@ -62,53 +55,42 @@ func (c *ConnectOptions) createRemoteInboundPod() (err error) {
return return
} }
tempIps := []*net.IPNet{c.localTunIP}
//wg := &sync.WaitGroup{}
//lock := &sync.Mutex{}
for _, workload := range c.Workloads { for _, workload := range c.Workloads {
if len(workload) > 0 { if len(workload) > 0 {
//wg.Add(1) virtualShadowIp, _ := c.dhcp.RentIPRandom()
/*go*/ c.usedIPs = append(c.usedIPs, virtualShadowIp)
func(finalWorkload string) { configInfo := util.PodRouteConfig{
//defer wg.Done() LocalTunIP: c.localTunIP.IP.String(),
//lock.Lock() InboundPodTunIP: virtualShadowIp.String(),
virtualShadowIp, _ := c.dhcp.RentIPRandom() TrafficManagerRealIP: c.routerIP.String(),
tempIps = append(tempIps, virtualShadowIp) Route: config.CIDR.String(),
//lock.Unlock() }
configInfo := util.PodRouteConfig{ // means mesh mode
LocalTunIP: c.localTunIP.IP.String(), if len(c.Headers) != 0 {
InboundPodTunIP: virtualShadowIp.String(), err = InjectVPNAndEnvoySidecar(c.factory, c.clientset.CoreV1().ConfigMaps(c.Namespace), c.Namespace, workload, configInfo, c.Headers)
TrafficManagerRealIP: c.routerIP.String(), } else {
Route: config.CIDR.String(), err = InjectVPNSidecar(c.factory, c.Namespace, workload, configInfo)
} }
// TODO OPTIMIZE CODE if err != nil {
if c.Mode == Mesh { log.Error(err)
err = InjectVPNAndEnvoySidecar(c.factory, c.clientset.CoreV1().ConfigMaps(c.Namespace), c.Namespace, finalWorkload, configInfo, c.Headers) return err
} else { }
err = InjectVPNSidecar(c.factory, c.Namespace, finalWorkload, configInfo)
}
if err != nil {
log.Error(err)
}
}(workload)
} }
} }
//wg.Wait()
c.usedIPs = tempIps
return return
} }
func (c *ConnectOptions) DoConnect() (err error) { func (c *ConnectOptions) DoConnect() (err error) {
c.addCleanUpResourceHandler(c.clientset, c.Namespace) c.addCleanUpResourceHandler(c.clientset, c.Namespace)
c.cidrs, err = util.GetCidrFromCNI(c.clientset, c.restclient, c.config, c.Namespace)
if err != nil {
return
}
trafficMangerNet := net.IPNet{IP: config.RouterIP, Mask: config.CIDR.Mask} trafficMangerNet := net.IPNet{IP: config.RouterIP, Mask: config.CIDR.Mask}
c.dhcp = NewDHCPManager(c.clientset.CoreV1().ConfigMaps(c.Namespace), c.Namespace, &trafficMangerNet) c.dhcp = NewDHCPManager(c.clientset.CoreV1().ConfigMaps(c.Namespace), c.Namespace, &trafficMangerNet)
if err = c.dhcp.InitDHCP(); err != nil { if err = c.dhcp.InitDHCP(); err != nil {
return return
} }
err = c.GetCIDR()
if err != nil {
return
}
c.routerIP, err = CreateOutboundPod(c.clientset, c.Namespace, trafficMangerNet.String(), c.cidrs) c.routerIP, err = CreateOutboundPod(c.clientset, c.Namespace, trafficMangerNet.String(), c.cidrs)
if err != nil { if err != nil {
return return
@@ -420,3 +402,35 @@ func (c *ConnectOptions) GetRunningPodList() ([]v1.Pod, error) {
} }
return list.Items, nil return list.Items, nil
} }
func (c *ConnectOptions) GetCIDR() (err error) {
// (1) get cidr from cache
var value string
value, err = c.dhcp.Get(config.KeyClusterIPv4POOLS)
if err == nil && len(value) != 0 {
for _, s := range strings.Split(value, " ") {
_, cidr, _ := net.ParseCIDR(s)
if cidr != nil {
c.cidrs = append(c.cidrs, cidr)
}
}
}
if len(c.cidrs) != 0 {
return
}
// (2) get cache from cni
c.cidrs, err = util.GetCidrFromCNI(c.clientset, c.restclient, c.config, c.Namespace)
if err == nil {
s := sets.NewString()
for _, cidr := range c.cidrs {
s.Insert(cidr.String())
}
_ = c.dhcp.Set(config.KeyClusterIPv4POOLS, strings.Join(s.List(), " "))
return
}
// (3) fallback to get cidr from node/pod/service
c.cidrs, err = util.GetCIDRFromResource(c.clientset, c.Namespace)
return
}

View File

@@ -34,7 +34,7 @@ var (
) )
func TestGetCIDR(t *testing.T) { func TestGetCIDR(t *testing.T) {
cidr, err := util.GetCIDR(clientsets, namespaces) cidr, err := util.GetCIDRFromResource(clientsets, namespaces)
if err == nil { if err == nil {
for _, ipNet := range cidr { for _, ipNet := range cidr {
fmt.Println(ipNet) fmt.Println(ipNet)

View File

@@ -139,3 +139,34 @@ func (d *DHCPManager) updateDHCPConfigMap(f func(*ipallocator.Range) error) erro
} }
return nil return nil
} }
func (d *DHCPManager) Set(key, value string) error {
cm, err := d.client.Get(context.Background(), config.ConfigMapPodTrafficManager, metav1.GetOptions{})
if err != nil {
log.Errorf("failed to get data, err: %v", err)
return err
}
if cm.Data == nil {
cm.Data = make(map[string]string)
}
cm.Data[key] = value
_, err = d.client.Update(context.Background(), cm, metav1.UpdateOptions{})
if err != nil {
log.Errorf("update data failed, err: %v", err)
return err
}
return nil
}
func (d *DHCPManager) Get(key string) (string, error) {
cm, err := d.client.Get(context.Background(), config.ConfigMapPodTrafficManager, metav1.GetOptions{})
if err != nil {
return "", err
}
if cm != nil && cm.Data != nil {
if v, ok := cm.Data[key]; ok {
return v, nil
}
}
return "", fmt.Errorf("can not get data")
}

View File

@@ -65,48 +65,40 @@ func InjectVPNAndEnvoySidecar(factory cmdutil.Factory, clientset v12.ConfigMapIn
if containerNames.HasAll(config.ContainerSidecarVPN, config.ContainerSidecarEnvoyProxy) { if containerNames.HasAll(config.ContainerSidecarVPN, config.ContainerSidecarEnvoyProxy) {
// add rollback func to remove envoy config // add rollback func to remove envoy config
RollbackFuncList = append(RollbackFuncList, func() { RollbackFuncList = append(RollbackFuncList, func() {
err = removeEnvoyConfig(clientset, nodeID, headers) err := UnPatchContainer(factory, clientset, namespace, workloads, headers)
if err != nil { if err != nil {
log.Warnln(err) log.Error(err)
} }
}) })
return nil return nil
} }
// (1) add mesh container
removePatch, restorePatch := patch(origin, path)
b, _ := json.Marshal(restorePatch)
mesh.AddMeshContainer(templateSpec, nodeID, c) mesh.AddMeshContainer(templateSpec, nodeID, c)
helper := pkgresource.NewHelper(object.Client, object.Mapping) helper := pkgresource.NewHelper(object.Client, object.Mapping)
bytes, err := json.Marshal([]struct { ps := []P{{
Op string `json:"op"`
Path string `json:"path"`
Value interface{} `json:"value"`
}{{
Op: "replace", Op: "replace",
Path: "/" + strings.Join(append(path, "spec"), "/"), Path: "/" + strings.Join(append(path, "spec"), "/"),
Value: templateSpec.Spec, Value: templateSpec.Spec,
}}) }, {
Op: "replace",
Path: "/metadata/annotations/probe",
Value: b,
}}
bytes, err := json.Marshal(append(ps, removePatch...))
if err != nil { if err != nil {
return err return err
} }
//t := true _, err = helper.Patch(object.Namespace, object.Name, types.JSONPatchType, bytes, &metav1.PatchOptions{})
_, err = helper.Patch(object.Namespace, object.Name, types.JSONPatchType, bytes, &metav1.PatchOptions{
//Force: &t,
})
removePatch, restorePatch := patch(origin, path)
_, err = helper.Patch(object.Namespace, object.Name, types.JSONPatchType, removePatch, &metav1.PatchOptions{})
if err != nil { if err != nil {
log.Warnf("error while remove probe of resource: %s %s, ignore, err: %v", log.Warnf("error while remove probe of resource: %s %s, ignore, err: %v", object.Mapping.GroupVersionKind.GroupKind().String(), object.Name, err)
object.Mapping.GroupVersionKind.GroupKind().String(), object.Name, err)
} }
RollbackFuncList = append(RollbackFuncList, func() { RollbackFuncList = append(RollbackFuncList, func() {
if err = UnPatchContainer(factory, clientset, namespace, workloads, headers); err != nil { if err = UnPatchContainer(factory, clientset, namespace, workloads, headers); err != nil {
log.Error(err) log.Error(err)
} }
if _, err = helper.Patch(object.Namespace, object.Name, types.JSONPatchType, restorePatch, &metav1.PatchOptions{}); err != nil {
log.Warnf("error while restore probe of resource: %s %s, ignore, err: %v",
object.Mapping.GroupVersionKind.GroupKind().String(), object.Name, err)
}
}) })
_ = util.RolloutStatus(factory, namespace, workloads, time.Minute*5) _ = util.RolloutStatus(factory, namespace, workloads, time.Minute*5)
return err return err
@@ -128,30 +120,34 @@ func UnPatchContainer(factory cmdutil.Factory, mapInterface v12.ConfigMapInterfa
nodeID := fmt.Sprintf("%s.%s", object.Mapping.Resource.GroupResource().String(), object.Name) nodeID := fmt.Sprintf("%s.%s", object.Mapping.Resource.GroupResource().String(), object.Name)
err = removeEnvoyConfig(mapInterface, nodeID, headers) var empty bool
empty, err = removeEnvoyConfig(mapInterface, nodeID, headers)
if err != nil { if err != nil {
log.Warnln(err) log.Warnln(err)
return err return err
} }
mesh.RemoveContainers(templateSpec) if empty {
helper := pkgresource.NewHelper(object.Client, object.Mapping) mesh.RemoveContainers(templateSpec)
bytes, err := json.Marshal([]struct { helper := pkgresource.NewHelper(object.Client, object.Mapping)
Op string `json:"op"` bytes, err := json.Marshal([]struct {
Path string `json:"path"` Op string `json:"op"`
Value interface{} `json:"value"` Path string `json:"path"`
}{{ Value interface{} `json:"value"`
Op: "replace", }{{
Path: "/" + strings.Join(append(depth, "spec"), "/"), Op: "replace",
Value: templateSpec.Spec, Path: "/" + strings.Join(append(depth, "spec"), "/"),
}}) Value: templateSpec.Spec,
if err != nil { }, {
return err Op: "replace",
Path: "/metadata/annotations/probe",
Value: "",
}})
if err != nil {
return err
}
_, err = helper.Patch(object.Namespace, object.Name, types.JSONPatchType, bytes, &metav1.PatchOptions{})
} }
//t := true
_, err = helper.Patch(object.Namespace, object.Name, types.JSONPatchType, bytes, &metav1.PatchOptions{
//Force: &t,
})
return err return err
} }
@@ -198,21 +194,21 @@ func addEnvoyConfig(mapInterface v12.ConfigMapInterface, nodeID string, localTUN
return err return err
} }
func removeEnvoyConfig(mapInterface v12.ConfigMapInterface, nodeID string, headers map[string]string) error { func removeEnvoyConfig(mapInterface v12.ConfigMapInterface, nodeID string, headers map[string]string) (bool, error) {
configMap, err := mapInterface.Get(context.TODO(), config.ConfigMapPodTrafficManager, metav1.GetOptions{}) configMap, err := mapInterface.Get(context.TODO(), config.ConfigMapPodTrafficManager, metav1.GetOptions{})
if k8serrors.IsNotFound(err) { if k8serrors.IsNotFound(err) {
return nil return true, nil
} }
if err != nil { if err != nil {
return err return false, err
} }
str, ok := configMap.Data[config.KeyEnvoy] str, ok := configMap.Data[config.KeyEnvoy]
if !ok { if !ok {
return errors.New("can not found value for key: envoy-config.yaml") return false, errors.New("can not found value for key: envoy-config.yaml")
} }
var v []*controlplane.Virtual var v []*controlplane.Virtual
if err = yaml.Unmarshal([]byte(str), &v); err != nil { if err = yaml.Unmarshal([]byte(str), &v); err != nil {
return err return false, err
} }
for _, virtual := range v { for _, virtual := range v {
if nodeID == virtual.Uid { if nodeID == virtual.Uid {
@@ -224,20 +220,22 @@ func removeEnvoyConfig(mapInterface v12.ConfigMapInterface, nodeID string, heade
} }
} }
} }
var empty bool
// remove default // remove default
for i := 0; i < len(v); i++ { for i := 0; i < len(v); i++ {
if nodeID == v[i].Uid && len(v[i].Rules) == 0 { if nodeID == v[i].Uid && len(v[i].Rules) == 0 {
v = append(v[:i], v[i+1:]...) v = append(v[:i], v[i+1:]...)
i-- i--
empty = true
} }
} }
marshal, err := yaml.Marshal(v) marshal, err := yaml.Marshal(v)
if err != nil { if err != nil {
return err return false, err
} }
configMap.Data[config.KeyEnvoy] = string(marshal) configMap.Data[config.KeyEnvoy] = string(marshal)
_, err = mapInterface.Update(context.Background(), configMap, metav1.UpdateOptions{}) _, err = mapInterface.Update(context.Background(), configMap, metav1.UpdateOptions{})
return err return empty, err
} }
func contains(a map[string]string, sub map[string]string) bool { func contains(a map[string]string, sub map[string]string) bool {

View File

@@ -82,6 +82,17 @@ func CreateOutboundPod(clientset *kubernetes.Clientset, namespace string, traffi
s = append(s, ipNet.String()) s = append(s, ipNet.String())
} }
var Resources = v1.ResourceRequirements{
Requests: map[v1.ResourceName]resource.Quantity{
v1.ResourceCPU: resource.MustParse("128m"),
v1.ResourceMemory: resource.MustParse("256Mi"),
},
Limits: map[v1.ResourceName]resource.Quantity{
v1.ResourceCPU: resource.MustParse("256m"),
v1.ResourceMemory: resource.MustParse("512Mi"),
},
}
deployment := &appsv1.Deployment{ deployment := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: config.ConfigMapPodTrafficManager, Name: config.ConfigMapPodTrafficManager,
@@ -147,16 +158,7 @@ kubevpn serve -L tcp://:10800 -L tun://:8422?net=${TrafficManagerIP} --debug=tru
ContainerPort: 10800, ContainerPort: 10800,
Protocol: v1.ProtocolTCP, Protocol: v1.ProtocolTCP,
}}, }},
Resources: v1.ResourceRequirements{ Resources: Resources,
Requests: map[v1.ResourceName]resource.Quantity{
v1.ResourceCPU: resource.MustParse("128m"),
v1.ResourceMemory: resource.MustParse("256Mi"),
},
Limits: map[v1.ResourceName]resource.Quantity{
v1.ResourceCPU: resource.MustParse("256m"),
v1.ResourceMemory: resource.MustParse("512Mi"),
},
},
ImagePullPolicy: v1.PullIfNotPresent, ImagePullPolicy: v1.PullIfNotPresent,
SecurityContext: &v1.SecurityContext{ SecurityContext: &v1.SecurityContext{
Capabilities: &v1.Capabilities{ Capabilities: &v1.Capabilities{
@@ -187,6 +189,7 @@ kubevpn serve -L tcp://:10800 -L tun://:8422?net=${TrafficManagerIP} --debug=tru
}, },
}, },
ImagePullPolicy: v1.PullIfNotPresent, ImagePullPolicy: v1.PullIfNotPresent,
Resources: Resources,
}, },
}, },
RestartPolicy: v1.RestartPolicyAlways, RestartPolicy: v1.RestartPolicyAlways,
@@ -269,31 +272,26 @@ func InjectVPNSidecar(factory cmdutil.Factory, namespace, workloads string, conf
} else } else
// controllers // controllers
{ {
bytes, _ := json.Marshal([]struct { // remove probe
Op string `json:"op"` removePatch, restorePatch := patch(origin, path)
Path string `json:"path"` p := []P{{
Value interface{} `json:"value"`
}{{
Op: "replace", Op: "replace",
Path: "/" + strings.Join(append(path, "spec"), "/"), Path: "/" + strings.Join(append(path, "spec"), "/"),
Value: podTempSpec.Spec, Value: podTempSpec.Spec,
}}) }}
bytes, _ := json.Marshal(append(p, removePatch...))
_, err = helper.Patch(object.Namespace, object.Name, types.JSONPatchType, bytes, &metav1.PatchOptions{}) _, err = helper.Patch(object.Namespace, object.Name, types.JSONPatchType, bytes, &metav1.PatchOptions{})
if err != nil { if err != nil {
log.Errorf("error while inject proxy container, err: %v, exiting...", err) log.Errorf("error while inject proxy container, err: %v, exiting...", err)
return err return err
} }
removePatch, restorePatch := patch(origin, path)
_, err = helper.Patch(object.Namespace, object.Name, types.JSONPatchType, removePatch, &metav1.PatchOptions{})
if err != nil {
log.Warnf("error while remove probe of resource: %s %s, ignore, err: %v",
object.Mapping.GroupVersionKind.GroupKind().String(), object.Name, err)
}
RollbackFuncList = append(RollbackFuncList, func() { RollbackFuncList = append(RollbackFuncList, func() {
if err = removeInboundContainer(factory, namespace, workloads); err != nil { if err = removeInboundContainer(factory, namespace, workloads); err != nil {
log.Error(err) log.Error(err)
} }
if _, err = helper.Patch(object.Namespace, object.Name, types.JSONPatchType, restorePatch, &metav1.PatchOptions{}); err != nil { b, _ := json.Marshal(restorePatch)
if _, err = helper.Patch(object.Namespace, object.Name, types.JSONPatchType, b, &metav1.PatchOptions{}); err != nil {
log.Warnf("error while restore probe of resource: %s %s, ignore, err: %v", log.Warnf("error while restore probe of resource: %s %s, ignore, err: %v",
object.Mapping.GroupVersionKind.GroupKind().String(), object.Name, err) object.Mapping.GroupVersionKind.GroupKind().String(), object.Name, err)
} }
@@ -397,13 +395,13 @@ func CleanupUselessInfo(pod *v1.Pod) {
pod.SetOwnerReferences(nil) pod.SetOwnerReferences(nil)
} }
func patch(spec v1.PodTemplateSpec, path []string) (removePatch []byte, restorePatch []byte) { type P struct {
type P struct { Op string `json:"op,omitempty"`
Op string `json:"op,omitempty"` Path string `json:"path,omitempty"`
Path string `json:"path,omitempty"` Value interface{} `json:"value,omitempty"`
Value interface{} `json:"value,omitempty"` }
}
var remove, restore []P func patch(spec v1.PodTemplateSpec, path []string) (remove []P, restore []P) {
for i := range spec.Spec.Containers { for i := range spec.Spec.Containers {
index := strconv.Itoa(i) index := strconv.Itoa(i)
readinessPath := "/" + strings.Join(append(path, "spec", "containers", index, "readinessProbe"), "/") readinessPath := "/" + strings.Join(append(path, "spec", "containers", index, "readinessProbe"), "/")
@@ -436,7 +434,5 @@ func patch(spec v1.PodTemplateSpec, path []string) (removePatch []byte, restoreP
Value: spec.Spec.Containers[i].StartupProbe, Value: spec.Spec.Containers[i].StartupProbe,
}) })
} }
removePatch, _ = json.Marshal(remove)
restorePatch, _ = json.Marshal(restore)
return return
} }

View File

@@ -125,7 +125,6 @@ func TestPreCheck(t *testing.T) {
options := ConnectOptions{ options := ConnectOptions{
KubeconfigPath: filepath.Join(homedir.HomeDir(), ".kube", "mesh"), KubeconfigPath: filepath.Join(homedir.HomeDir(), ".kube", "mesh"),
Namespace: "naison-test", Namespace: "naison-test",
Mode: "reverse",
Workloads: []string{"services/authors"}, Workloads: []string{"services/authors"},
} }
options.InitClient() options.InitClient()

View File

@@ -20,27 +20,20 @@ import (
"strings" "strings"
) )
func GetCIDR(clientset *kubernetes.Clientset, namespace string) ([]*net.IPNet, error) { func GetCIDRFromResource(clientset *kubernetes.Clientset, namespace string) ([]*net.IPNet, error) {
var CIDRList []*net.IPNet var list []*net.IPNet
// get pod CIDR from node spec // (1) get pod CIDR from node spec
nodeList, err := clientset.CoreV1().Nodes().List(context.TODO(), v1.ListOptions{}) nodeList, err := clientset.CoreV1().Nodes().List(context.TODO(), v1.ListOptions{})
if err == nil { for _, node := range nodeList.Items {
var podCIDRs = sets.NewString() for _, c := range sets.NewString(node.Spec.PodCIDRs...).Insert(node.Spec.PodCIDR).List() {
for _, node := range nodeList.Items { _, cidr, _ := net.ParseCIDR(c)
if node.Spec.PodCIDRs != nil { if cidr != nil {
podCIDRs.Insert(node.Spec.PodCIDRs...) list = append(list, cidr)
}
if len(node.Spec.PodCIDR) != 0 {
podCIDRs.Insert(node.Spec.PodCIDR)
}
}
for _, podCIDR := range podCIDRs.List() {
if _, CIDR, err := net.ParseCIDR(podCIDR); err == nil {
CIDRList = append(CIDRList, CIDR)
} }
} }
} }
// get pod CIDR from pod ip, why doing this: notice that node's pod cidr is not correct in minikube
// (2) get pod CIDR from pod ip, why doing this: notice that node's pod cidr is not correct in minikube
// ➜ ~ kubectl get nodes -o jsonpath='{.items[*].spec.podCIDR}' // ➜ ~ kubectl get nodes -o jsonpath='{.items[*].spec.podCIDR}'
//10.244.0.0/24% //10.244.0.0/24%
// ➜ ~ kubectl get pods -o=custom-columns=podIP:.status.podIP // ➜ ~ kubectl get pods -o=custom-columns=podIP:.status.podIP
@@ -55,29 +48,27 @@ func GetCIDR(clientset *kubernetes.Clientset, namespace string) ([]*net.IPNet, e
//172.17.0.3 //172.17.0.3
//172.17.0.7 //172.17.0.7
//172.17.0.2 //172.17.0.2
podList, err := clientset.CoreV1().Pods(namespace).List(context.TODO(), v1.ListOptions{}) podList, _ := clientset.CoreV1().Pods(namespace).List(context.TODO(), v1.ListOptions{})
if err == nil { for _, pod := range podList.Items {
for _, pod := range podList.Items { if pod.Spec.HostNetwork {
if pod.Spec.HostNetwork { continue
continue }
if ip := net.ParseIP(pod.Status.PodIP); ip != nil {
var contain bool
for _, cidr := range list {
if cidr.Contains(ip) {
contain = true
break
}
} }
if ip := net.ParseIP(pod.Status.PodIP); ip != nil { if !contain {
var contain bool mask := net.CIDRMask(24, 32)
for _, CIDR := range CIDRList { list = append(list, &net.IPNet{IP: ip.Mask(mask), Mask: mask})
if CIDR.Contains(ip) {
contain = true
break
}
}
if !contain {
mask := net.CIDRMask(24, 32)
CIDRList = append(CIDRList, &net.IPNet{IP: ip.Mask(mask), Mask: mask})
}
} }
} }
} }
// get service CIDR // (3) get service CIDR
defaultCIDRIndex := "The range of valid IPs is" defaultCIDRIndex := "The range of valid IPs is"
_, err = clientset.CoreV1().Services(namespace).Create(context.TODO(), &v12.Service{ _, err = clientset.CoreV1().Services(namespace).Create(context.TODO(), &v12.Service{
ObjectMeta: v1.ObjectMeta{GenerateName: "foo-svc-"}, ObjectMeta: v1.ObjectMeta{GenerateName: "foo-svc-"},
@@ -86,36 +77,34 @@ func GetCIDR(clientset *kubernetes.Clientset, namespace string) ([]*net.IPNet, e
if err != nil { if err != nil {
idx := strings.LastIndex(err.Error(), defaultCIDRIndex) idx := strings.LastIndex(err.Error(), defaultCIDRIndex)
if idx != -1 { if idx != -1 {
_, cidr, err := net.ParseCIDR(strings.TrimSpace(err.Error()[idx+len(defaultCIDRIndex):])) _, cidr, _ := net.ParseCIDR(strings.TrimSpace(err.Error()[idx+len(defaultCIDRIndex):]))
if err == nil { if cidr != nil {
CIDRList = append(CIDRList, cidr) list = append(list, cidr)
} }
} }
} else { } else {
serviceList, err := clientset.CoreV1().Services(namespace).List(context.TODO(), v1.ListOptions{}) serviceList, _ := clientset.CoreV1().Services(namespace).List(context.TODO(), v1.ListOptions{})
if err == nil { for _, service := range serviceList.Items {
for _, service := range serviceList.Items { if ip := net.ParseIP(service.Spec.ClusterIP); ip != nil {
if ip := net.ParseIP(service.Spec.ClusterIP); ip != nil { var contain bool
var contain bool for _, CIDR := range list {
for _, CIDR := range CIDRList { if CIDR.Contains(ip) {
if CIDR.Contains(ip) { contain = true
contain = true break
break
}
}
if !contain {
mask := net.CIDRMask(16, 32)
CIDRList = append(CIDRList, &net.IPNet{IP: ip.Mask(mask), Mask: mask})
} }
} }
if !contain {
mask := net.CIDRMask(24, 32)
list = append(list, &net.IPNet{IP: ip.Mask(mask), Mask: mask})
}
} }
} }
} }
// remove duplicate CIDR // (4) remove duplicate CIDR
result := make([]*net.IPNet, 0) result := make([]*net.IPNet, 0)
set := sets.NewString() set := sets.NewString()
for _, cidr := range CIDRList { for _, cidr := range list {
if !set.Has(cidr.String()) { if !set.Has(cidr.String()) {
set.Insert(cidr.String()) set.Insert(cidr.String())
result = append(result, cidr) result = append(result, cidr)
@@ -127,18 +116,18 @@ func GetCIDR(clientset *kubernetes.Clientset, namespace string) ([]*net.IPNet, e
return result, nil return result, nil
} }
// todo use patch to update this pod
func GetCidrFromCNI(clientset *kubernetes.Clientset, restclient *rest.RESTClient, restconfig *rest.Config, namespace string) ([]*net.IPNet, error) { func GetCidrFromCNI(clientset *kubernetes.Clientset, restclient *rest.RESTClient, restconfig *rest.Config, namespace string) ([]*net.IPNet, error) {
var name = "cni-net-dir-kubevpn"
hostPathType := v12.HostPathDirectoryOrCreate hostPathType := v12.HostPathDirectoryOrCreate
pod := &v12.Pod{ pod := &v12.Pod{
ObjectMeta: v1.ObjectMeta{ ObjectMeta: v1.ObjectMeta{
Name: config.ConfigMapPodTrafficManager, Name: name,
Namespace: namespace, Namespace: namespace,
}, },
Spec: v12.PodSpec{ Spec: v12.PodSpec{
Volumes: []v12.Volume{ Volumes: []v12.Volume{
{ {
Name: "cni-net-dir", Name: name,
VolumeSource: v12.VolumeSource{ VolumeSource: v12.VolumeSource{
HostPath: &v12.HostPathVolumeSource{ HostPath: &v12.HostPathVolumeSource{
Path: config.DefaultNetDir, Path: config.DefaultNetDir,
@@ -149,7 +138,7 @@ func GetCidrFromCNI(clientset *kubernetes.Clientset, restclient *rest.RESTClient
}, },
Containers: []v12.Container{ Containers: []v12.Container{
{ {
Name: config.ContainerSidecarVPN, Name: name,
Image: "guosen-dev.cargo.io/epscplibrary/kubevpn:latest", Image: "guosen-dev.cargo.io/epscplibrary/kubevpn:latest",
Command: []string{"tail", "-f", "/dev/null"}, Command: []string{"tail", "-f", "/dev/null"},
Resources: v12.ResourceRequirements{ Resources: v12.ResourceRequirements{
@@ -164,7 +153,7 @@ func GetCidrFromCNI(clientset *kubernetes.Clientset, restclient *rest.RESTClient
}, },
VolumeMounts: []v12.VolumeMount{ VolumeMounts: []v12.VolumeMount{
{ {
Name: "cni-net-dir", Name: name,
ReadOnly: true, ReadOnly: true,
MountPath: config.DefaultNetDir, MountPath: config.DefaultNetDir,
}, },
@@ -176,15 +165,18 @@ func GetCidrFromCNI(clientset *kubernetes.Clientset, restclient *rest.RESTClient
} }
get, err := clientset.CoreV1().Pods(pod.Namespace).Get(context.Background(), pod.Name, v1.GetOptions{}) get, err := clientset.CoreV1().Pods(pod.Namespace).Get(context.Background(), pod.Name, v1.GetOptions{})
if k8serrors.IsNotFound(err) || get.Status.Phase != v12.PodRunning { if k8serrors.IsNotFound(err) || get.Status.Phase != v12.PodRunning {
create, err := clientset.CoreV1().Pods(namespace).Create(context.Background(), pod, v1.CreateOptions{}) var deleteFunc = func() {
_ = clientset.CoreV1().Pods(namespace).Delete(context.Background(), pod.Name, v1.DeleteOptions{GracePeriodSeconds: pointer.Int64(0)})
}
// delete pod anyway
deleteFunc()
pod, err = clientset.CoreV1().Pods(namespace).Create(context.Background(), pod, v1.CreateOptions{})
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer clientset.CoreV1().Pods(namespace).Delete(context.TODO(), create.Name, v1.DeleteOptions{ defer deleteFunc()
GracePeriodSeconds: pointer.Int64(0),
})
err = WaitPod(clientset.CoreV1().Pods(namespace), v1.ListOptions{ err = WaitPod(clientset.CoreV1().Pods(namespace), v1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("metadata.name", create.Name).String(), FieldSelector: fields.OneTermEqualSelector("metadata.name", pod.Name).String(),
}, func(pod *v12.Pod) bool { }, func(pod *v12.Pod) bool {
return pod.Status.Phase == v12.PodRunning return pod.Status.Phase == v12.PodRunning
}) })
@@ -223,5 +215,9 @@ func GetCidrFromCNI(clientset *kubernetes.Clientset, restclient *rest.RESTClient
} }
} }
if len(result) == 0 {
return nil, fmt.Errorf("can not found any cidr")
}
return result, nil return result, nil
} }

View File

@@ -198,7 +198,6 @@ func Shell(clientset *kubernetes.Clientset, restclient *rest.RESTClient, config
Stderr: StreamOptions.ErrOut != nil, Stderr: StreamOptions.ErrOut != nil,
TTY: tt.Raw, TTY: tt.Raw,
}, scheme.ParameterCodec) }, scheme.ParameterCodec)
fmt.Println(req.URL())
return Executor.Execute("POST", req.URL(), config, StreamOptions.In, StreamOptions.Out, StreamOptions.ErrOut, tt.Raw, sizeQueue) return Executor.Execute("POST", req.URL(), config, StreamOptions.In, StreamOptions.Out, StreamOptions.ErrOut, tt.Raw, sizeQueue)
} }

View File

@@ -45,6 +45,14 @@ spec:
- containerPort: 9080 - containerPort: 9080
securityContext: securityContext:
runAsUser: 1000 runAsUser: 1000
resources:
requests:
memory: "64Mi"
cpu: "250m"
limits:
memory: "128Mi"
cpu: "500m"
--- ---
# Ratings service # Ratings service
apiVersion: v1 apiVersion: v1
@@ -93,6 +101,13 @@ spec:
- containerPort: 9080 - containerPort: 9080
securityContext: securityContext:
runAsUser: 1000 runAsUser: 1000
resources:
requests:
memory: "64Mi"
cpu: "250m"
limits:
memory: "128Mi"
cpu: "500m"
--- ---
# Reviews service # Reviews service
apiVersion: v1 apiVersion: v1
@@ -149,6 +164,13 @@ spec:
mountPath: /opt/ibm/wlp/output mountPath: /opt/ibm/wlp/output
securityContext: securityContext:
runAsUser: 1000 runAsUser: 1000
resources:
requests:
memory: "64Mi"
cpu: "250m"
limits:
memory: "128Mi"
cpu: "500m"
volumes: volumes:
- name: wlp-output - name: wlp-output
emptyDir: { } emptyDir: { }
@@ -205,6 +227,13 @@ spec:
mountPath: /tmp mountPath: /tmp
securityContext: securityContext:
runAsUser: 1000 runAsUser: 1000
resources:
requests:
memory: "64Mi"
cpu: "250m"
limits:
memory: "128Mi"
cpu: "500m"
volumes: volumes:
- name: tmp - name: tmp
emptyDir: { } emptyDir: { }