hotfix: fix dev/clone mode bug (#640)

This commit is contained in:
naison
2025-06-12 12:59:39 +08:00
committed by GitHub
parent 072e67ce6c
commit 4949df56ef
14 changed files with 95 additions and 130 deletions

View File

@@ -62,7 +62,7 @@ func (svr *Server) Clone(resp rpc.Daemon_CloneServer) (err error) {
if err != nil {
return err
}
err = connResp.SendMsg(&connReq)
err = connResp.Send(connReq)
if err != nil {
return err
}

View File

@@ -85,7 +85,7 @@ func (svr *Server) Disconnect(resp rpc.Daemon_DisconnectServer) error {
plog.G(ctx).Errorf("Index %d out of range", req.GetID())
}
case req.KubeconfigBytes != nil && req.Namespace != nil:
err = disconnectByKubeConfig(
err = disconnectByKubeconfig(
resp.Context(),
svr,
req.GetKubeconfigBytes(),
@@ -133,7 +133,7 @@ func (svr *Server) Disconnect(resp rpc.Daemon_DisconnectServer) error {
return nil
}
func disconnectByKubeConfig(ctx context.Context, svr *Server, kubeconfigBytes string, ns string, jump *rpc.SshJump) error {
func disconnectByKubeconfig(ctx context.Context, svr *Server, kubeconfigBytes string, ns string, jump *rpc.SshJump) error {
file, err := util.ConvertToTempKubeconfigFile([]byte(kubeconfigBytes))
if err != nil {
return err
@@ -156,10 +156,6 @@ func disconnectByKubeConfig(ctx context.Context, svr *Server, kubeconfigBytes st
}
func disconnect(ctx context.Context, svr *Server, connect *handler.ConnectOptions) {
_, err := svr.GetClient(false)
if err != nil {
return
}
if svr.connect != nil {
isSameCluster, _ := util.IsSameCluster(
ctx,

View File

@@ -134,7 +134,7 @@ func (option *Options) Connect(ctx context.Context, sshConfig *pkgssh.SshConfig,
if err != nil {
return err
}
_ = util.PrintGRPCStream[rpc.DisconnectResponse](ctx, resp)
_ = util.PrintGRPCStream[rpc.DisconnectResponse](nil, resp)
return nil
})
var resp rpc.Daemon_ProxyClient
@@ -185,7 +185,7 @@ func (option *Options) Connect(ctx context.Context, sshConfig *pkgssh.SshConfig,
}
func (option *Options) Dev(ctx context.Context, config *Config, hostConfig *HostConfig) error {
templateSpec, err := option.GetPodTemplateSpec()
templateSpec, err := option.GetPodTemplateSpec(ctx)
if err != nil {
plog.G(ctx).Errorf("Failed to get unstructured object error: %v", err)
return err
@@ -237,7 +237,7 @@ func (option *Options) Dev(ctx context.Context, config *Config, hostConfig *Host
}
func (option *Options) CreateConnectContainer(ctx context.Context, portBindings nat.PortMap, managerNamespace string) (*string, error) {
portMap, portSet, err := option.GetExposePort(portBindings)
portMap, portSet, err := option.GetExposePort(ctx, portBindings)
if err != nil {
return nil, err
}
@@ -342,8 +342,8 @@ func (option *Options) GetRollbackFuncList() []func() error {
return option.rollbackFuncList
}
func (option *Options) GetExposePort(portBinds nat.PortMap) (nat.PortMap, nat.PortSet, error) {
templateSpec, err := option.GetPodTemplateSpec()
func (option *Options) GetExposePort(ctx context.Context, portBinds nat.PortMap) (nat.PortMap, nat.PortSet, error) {
templateSpec, err := option.GetPodTemplateSpec(ctx)
if err != nil {
plog.G(context.Background()).Errorf("Failed to get unstructured object error: %v", err)
return nil, nil, err
@@ -390,13 +390,13 @@ func (option *Options) InitClient(f cmdutil.Factory) (err error) {
return
}
func (option *Options) GetPodTemplateSpec() (*v1.PodTemplateSpec, error) {
object, err := util.GetUnstructuredObject(option.factory, option.Namespace, option.Workload)
func (option *Options) GetPodTemplateSpec(ctx context.Context) (*v1.PodTemplateSpec, error) {
_, controller, err := util.GetTopOwnerObject(ctx, option.factory, option.Namespace, option.Workload)
if err != nil {
return nil, err
}
u := object.Object.(*unstructured.Unstructured)
u := controller.Object.(*unstructured.Unstructured)
var templateSpec *v1.PodTemplateSpec
templateSpec, _, err = util.GetPodTemplateSpecPath(u)
return templateSpec, err

View File

@@ -71,7 +71,7 @@ func (l ConfigList) Run(ctx context.Context) error {
}
if index != 0 {
err := WaitDockerContainerRunning(ctx, conf.name)
err = WaitDockerContainerRunning(ctx, conf.name)
if err != nil {
return err
}

View File

@@ -12,7 +12,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/retry"
"github.com/wencaiwulue/kubevpn/v2/pkg/config"
@@ -21,14 +21,14 @@ import (
)
type Manager struct {
client corev1.ConfigMapInterface
client *kubernetes.Clientset
cidr *net.IPNet
cidr6 *net.IPNet
namespace string
clusterID types.UID
}
func NewDHCPManager(client corev1.ConfigMapInterface, namespace string) *Manager {
func NewDHCPManager(client *kubernetes.Clientset, namespace string) *Manager {
return &Manager{
client: client,
namespace: namespace,
@@ -40,14 +40,14 @@ func NewDHCPManager(client corev1.ConfigMapInterface, namespace string) *Manager
// InitDHCP
// TODO optimize dhcp, using mac address, ip and deadline as unit
func (m *Manager) InitDHCP(ctx context.Context) error {
cm, err := m.client.Get(ctx, config.ConfigMapPodTrafficManager, metav1.GetOptions{})
cm, err := m.client.CoreV1().ConfigMaps(m.namespace).Get(ctx, config.ConfigMapPodTrafficManager, metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("failed to get configmap %s, err: %v", config.ConfigMapPodTrafficManager, err)
}
if err == nil {
m.clusterID = util.GetClusterIDByCM(cm)
return nil
m.clusterID, err = util.GetClusterID(ctx, m.client.CoreV1().Namespaces(), m.namespace)
return err
}
cm = &v1.ConfigMap{
@@ -63,12 +63,12 @@ func (m *Manager) InitDHCP(ctx context.Context) error {
config.KeyClusterIPv4POOLS: "",
},
}
cm, err = m.client.Create(ctx, cm, metav1.CreateOptions{})
cm, err = m.client.CoreV1().ConfigMaps(m.namespace).Create(ctx, cm, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("failed to create configmap: %v", err)
}
m.clusterID = util.GetClusterIDByCM(cm)
return nil
m.clusterID, err = util.GetClusterID(ctx, m.client.CoreV1().Namespaces(), m.namespace)
return err
}
func (m *Manager) RentIP(ctx context.Context) (*net.IPNet, *net.IPNet, error) {
@@ -134,7 +134,7 @@ func (m *Manager) ReleaseIP(ctx context.Context, ips ...net.IP) error {
}
func (m *Manager) updateDHCPConfigMap(ctx context.Context, f func(ipv4 *ipallocator.Range, ipv6 *ipallocator.Range) error) error {
cm, err := m.client.Get(ctx, config.ConfigMapPodTrafficManager, metav1.GetOptions{})
cm, err := m.client.CoreV1().ConfigMaps(m.namespace).Get(ctx, config.ConfigMapPodTrafficManager, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get configmap DHCP server, err: %v", err)
}
@@ -189,7 +189,7 @@ func (m *Manager) updateDHCPConfigMap(ctx context.Context, f func(ipv4 *ipalloca
return err
}
cm.Data[config.KeyDHCP6] = base64.StdEncoding.EncodeToString(bytes)
_, err = m.client.Update(ctx, cm, metav1.UpdateOptions{})
_, err = m.client.CoreV1().ConfigMaps(m.namespace).Update(ctx, cm, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to update DHCP: %v", err)
}
@@ -201,7 +201,7 @@ func (m *Manager) Set(ctx context.Context, key, value string) error {
retry.DefaultRetry,
func() error {
p := []byte(fmt.Sprintf(`[{"op": "replace", "path": "/data/%s", "value": "%s"}]`, key, value))
_, err := m.client.Patch(ctx, config.ConfigMapPodTrafficManager, types.JSONPatchType, p, metav1.PatchOptions{})
_, err := m.client.CoreV1().ConfigMaps(m.namespace).Patch(ctx, config.ConfigMapPodTrafficManager, types.JSONPatchType, p, metav1.PatchOptions{})
return err
})
if err != nil {
@@ -212,7 +212,7 @@ func (m *Manager) Set(ctx context.Context, key, value string) error {
}
func (m *Manager) Get(ctx context.Context, key string) (string, error) {
cm, err := m.client.Get(ctx, config.ConfigMapPodTrafficManager, metav1.GetOptions{})
cm, err := m.client.CoreV1().ConfigMaps(m.namespace).Get(ctx, config.ConfigMapPodTrafficManager, metav1.GetOptions{})
if err != nil {
return "", err
}
@@ -220,7 +220,7 @@ func (m *Manager) Get(ctx context.Context, key string) (string, error) {
}
func (m *Manager) ForEach(ctx context.Context, fnv4 func(net.IP), fnv6 func(net.IP)) error {
cm, err := m.client.Get(ctx, config.ConfigMapPodTrafficManager, metav1.GetOptions{})
cm, err := m.client.CoreV1().ConfigMaps(m.namespace).Get(ctx, config.ConfigMapPodTrafficManager, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get cm DHCP server, err: %v", err)
}

View File

@@ -29,8 +29,7 @@ func (s *Server) RentIP(ctx context.Context, req *rpc.RentIPRequest) (*rpc.RentI
defer s.Unlock()
plog.G(ctx).Infof("Handling rent IP request, pod name: %s, ns: %s", req.PodName, req.PodNamespace)
mapInterface := s.clientset.CoreV1().ConfigMaps(req.PodNamespace)
manager := NewDHCPManager(mapInterface, req.PodNamespace)
manager := NewDHCPManager(s.clientset, req.PodNamespace)
v4, v6, err := manager.RentIP(ctx)
if err != nil {
plog.G(ctx).Errorf("Failed to rent IP: %v", err)
@@ -59,8 +58,7 @@ func (s *Server) ReleaseIP(ctx context.Context, req *rpc.ReleaseIPRequest) (*rpc
ips = append(ips, ip)
}
mapInterface := s.clientset.CoreV1().ConfigMaps(req.PodNamespace)
manager := NewDHCPManager(mapInterface, req.PodNamespace)
manager := NewDHCPManager(s.clientset, req.PodNamespace)
if err := manager.ReleaseIP(ctx, ips...); err != nil {
plog.G(ctx).Errorf("Failed to release IP: %v", err)
return nil, err

View File

@@ -97,11 +97,11 @@ func (d *CloneOptions) DoClone(ctx context.Context, kubeconfigJsonBytes []byte,
}
for _, workload := range d.Workloads {
plog.G(ctx).Infof("Clone workload %s", workload)
object, err := util.GetUnstructuredObject(d.factory, d.Namespace, workload)
_, controller, err := util.GetTopOwnerObject(ctx, d.factory, d.Namespace, workload)
if err != nil {
return err
}
u := object.Object.(*unstructured.Unstructured)
u := controller.Object.(*unstructured.Unstructured)
if err = unstructured.SetNestedField(u.UnstructuredContent(), int64(1), "spec", "replicas"); err != nil {
plog.G(ctx).Warnf("Failed to set repilcaset to 1: %v", err)
}
@@ -113,7 +113,7 @@ func (d *CloneOptions) DoClone(ctx context.Context, kubeconfigJsonBytes []byte,
}
originName := u.GetName()
u.SetName(fmt.Sprintf("%s-clone-%s", u.GetName(), newUUID.String()[:5]))
d.TargetWorkloadNames[workload] = fmt.Sprintf("%s/%s", object.Mapping.Resource.GroupResource().Resource, u.GetName())
d.TargetWorkloadNames[workload] = fmt.Sprintf("%s/%s", controller.Mapping.Resource.GroupResource().Resource, u.GetName())
labelsMap := map[string]string{
config.ManageBy: config.ConfigMapPodTrafficManager,
"owner-ref": u.GetName(),
@@ -242,15 +242,11 @@ func (d *CloneOptions) DoClone(ctx context.Context, kubeconfigJsonBytes []byte,
if err != nil {
return err
}
//v := unstructured.Unstructured{}
//_, _, err = clientgoscheme.Codecs.UniversalDecoder(object.Mapping.GroupVersionKind.GroupVersion()).Decode(marshal, nil, &v)
//_, _, err = unstructured.UnstructuredJSONScheme.Decode(marshal, &object.Mapping.GroupVersionKind, &v)
if err = unstructured.SetNestedField(u.Object, m, path...); err != nil {
return err
}
_, createErr := client.Resource(object.Mapping.Resource).Namespace(d.Namespace).Create(context.Background(), u, metav1.CreateOptions{})
//_, createErr := runtimeresource.NewHelper(object.Client, object.Mapping).Create(d.TargetNamespace, true, u)
_, createErr := client.Resource(controller.Mapping.Resource).Namespace(d.Namespace).Create(context.Background(), u, metav1.CreateOptions{})
return createErr
})
if retryErr != nil {

View File

@@ -100,7 +100,7 @@ func (c *ConnectOptions) Context() context.Context {
func (c *ConnectOptions) InitDHCP(ctx context.Context) error {
if c.dhcp == nil {
c.dhcp = dhcp.NewDHCPManager(c.clientset.CoreV1().ConfigMaps(c.Namespace), c.Namespace)
c.dhcp = dhcp.NewDHCPManager(c.clientset, c.Namespace)
return c.dhcp.InitDHCP(ctx)
}
return nil
@@ -211,7 +211,7 @@ func (c *ConnectOptions) CreateRemoteInboundPod(ctx context.Context, namespace s
func (c *ConnectOptions) DoConnect(ctx context.Context, isLite bool) (err error) {
c.ctx, c.cancel = context.WithCancel(ctx)
plog.G(ctx).Info("Starting connect to cluster")
m := dhcp.NewDHCPManager(c.clientset.CoreV1().ConfigMaps(c.Namespace), c.Namespace)
m := dhcp.NewDHCPManager(c.clientset, c.Namespace)
if err = m.InitDHCP(c.ctx); err != nil {
plog.G(ctx).Errorf("Init DHCP server failed: %v", err)
return

View File

@@ -13,8 +13,6 @@ import (
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"github.com/wencaiwulue/kubevpn/v2/pkg/config"
)
func GetDNSServiceIPFromPod(ctx context.Context, clientset *kubernetes.Clientset, conf *rest.Config, podName, namespace string) (*dns.ClientConfig, error) {
@@ -73,10 +71,5 @@ func GetDNS(ctx context.Context, clientSet *kubernetes.Clientset, restConfig *re
if err != nil {
return nil, err
}
svc, err := clientSet.CoreV1().Services(ns).Get(ctx, config.ConfigMapPodTrafficManager, v12.GetOptions{})
if err != nil {
return nil, err
}
clientConfig.Servers = []string{svc.Spec.ClusterIP}
return clientConfig, nil
}

View File

@@ -12,7 +12,6 @@ import (
"unsafe"
errors2 "github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@@ -32,28 +31,23 @@ import (
"github.com/wencaiwulue/kubevpn/v2/pkg/log"
)
func GetClusterID(ctx context.Context, client v12.ConfigMapInterface) (types.UID, error) {
configMap, err := client.Get(ctx, config.ConfigMapPodTrafficManager, metav1.GetOptions{})
func GetClusterID(ctx context.Context, client v12.NamespaceInterface, ns string) (types.UID, error) {
namespace, err := client.Get(ctx, ns, metav1.GetOptions{})
if err != nil {
return "", err
}
return configMap.UID, nil
return namespace.UID, nil
}
func GetClusterIDByCM(cm *v1.ConfigMap) types.UID {
return cm.UID
}
func IsSameCluster(ctx context.Context, client v12.CoreV1Interface, namespace string, clientB v12.CoreV1Interface, namespaceB string) (bool, error) {
if namespace != namespaceB {
func IsSameCluster(ctx context.Context, clientA v12.CoreV1Interface, namespaceA string, clientB v12.CoreV1Interface, namespaceB string) (bool, error) {
if namespaceA != namespaceB {
return false, nil
}
clusterIDA, err := GetClusterID(ctx, client.ConfigMaps(namespace))
clusterIDA, err := GetClusterID(ctx, clientA.Namespaces(), namespaceA)
if err != nil {
return false, err
}
var clusterIDB types.UID
clusterIDB, err = GetClusterID(ctx, clientB.ConfigMaps(namespaceB))
clusterIDB, err := GetClusterID(ctx, clientB.Namespaces(), namespaceB)
if err != nil {
return false, err
}

View File

@@ -19,7 +19,6 @@ import (
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/sets"
@@ -189,45 +188,7 @@ func PortForwardPod(config *rest.Config, clientset *rest.RESTClient, podName, na
}
}
func GetTopOwnerReference(factory util.Factory, ns, workload string) (object, controller *resource.Info, err error) {
object, err = GetUnstructuredObject(factory, ns, workload)
if err != nil {
return nil, nil, err
}
ownerRef := v1.GetControllerOf(object.Object.(*unstructured.Unstructured))
if ownerRef == nil {
return object, object, err
}
var owner = fmt.Sprintf("%s/%s", ownerRef.Kind, ownerRef.Name)
for {
controller, err = GetUnstructuredObject(factory, ns, owner)
if err != nil {
return nil, nil, err
}
ownerRef = v1.GetControllerOf(controller.Object.(*unstructured.Unstructured))
if ownerRef == nil {
return object, controller, nil
}
owner = fmt.Sprintf("%s/%s", ownerRef.Kind, ownerRef.Name)
}
}
// GetTopOwnerReferenceBySelector assume pods, controller has same labels
func GetTopOwnerReferenceBySelector(factory util.Factory, ns, selector string) (object, controller *resource.Info, err error) {
objectList, err := GetUnstructuredObjectBySelector(factory, ns, selector)
if err != nil {
return nil, nil, err
}
for _, info := range objectList {
if IsK8sService(info) {
continue
}
return GetTopOwnerReference(factory, ns, fmt.Sprintf("%s/%s", info.Mapping.Resource.GroupResource().String(), info.Name))
}
return nil, nil, fmt.Errorf("can not find controller for %s", selector)
}
func Shell(_ context.Context, clientset *kubernetes.Clientset, config *rest.Config, podName, containerName, ns string, cmd []string) (string, error) {
func Shell(ctx context.Context, clientset *kubernetes.Clientset, config *rest.Config, podName, containerName, ns string, cmd []string) (string, error) {
stdin, _, _ := term.StdStreams()
buf := bytes.NewBuffer(nil)
options := exec.ExecOptions{

View File

@@ -62,7 +62,7 @@ func GetUnstructuredObjectList(f util.Factory, ns string, workloads []string) ([
return infos, err
}
func GetUnstructuredObjectBySelector(f util.Factory, ns string, selector string) ([]*resource.Info, error) {
func getUnstructuredObjectBySelector(f util.Factory, ns string, selector string) ([]*resource.Info, error) {
do := f.NewBuilder().
Unstructured().
NamespaceParam(ns).DefaultNamespace().AllNamespaces(false).
@@ -139,7 +139,7 @@ func NormalizedResource(f util.Factory, ns string, workloads []string) ([]string
func GetTopOwnerObject(ctx context.Context, f util.Factory, ns string, workload string) (object, controller *resource.Info, err error) {
// normal workload, like pod with controller, deployments, statefulset, replicaset etc...
object, controller, err = GetTopOwnerReference(f, ns, workload)
object, controller, err = getTopOwnerReference(f, ns, workload)
if err != nil {
return nil, nil, err
}
@@ -168,14 +168,52 @@ func GetTopOwnerObject(ctx context.Context, f util.Factory, ns string, workload
}
// if pod is not empty, using pods to find top controller
if len(podList.Items) != 0 {
_, controller, err = GetTopOwnerReference(f, ns, fmt.Sprintf("%s/%s", "pods", podList.Items[0].Name))
_, controller, err = getTopOwnerReference(f, ns, fmt.Sprintf("%s/%s", "pods", podList.Items[0].Name))
return object, controller, err
}
// if list is empty, means not create pods, just controllers
_, controller, err = GetTopOwnerReferenceBySelector(f, ns, selector.String())
_, controller, err = getTopOwnerReferenceBySelector(f, ns, selector.String())
return object, controller, err
}
func IsK8sService(info *resource.Info) bool {
return info.Mapping.Resource.Resource == "services"
}
func getTopOwnerReference(factory util.Factory, ns, workload string) (object, controller *resource.Info, err error) {
object, err = GetUnstructuredObject(factory, ns, workload)
if err != nil {
return nil, nil, err
}
ownerRef := v2.GetControllerOf(object.Object.(*unstructured.Unstructured))
if ownerRef == nil {
return object, object, err
}
var owner = fmt.Sprintf("%s/%s", ownerRef.Kind, ownerRef.Name)
for {
controller, err = GetUnstructuredObject(factory, ns, owner)
if err != nil {
return nil, nil, err
}
ownerRef = v2.GetControllerOf(controller.Object.(*unstructured.Unstructured))
if ownerRef == nil {
return object, controller, nil
}
owner = fmt.Sprintf("%s/%s", ownerRef.Kind, ownerRef.Name)
}
}
// getTopOwnerReferenceBySelector assume pods, controller has same labels
func getTopOwnerReferenceBySelector(factory util.Factory, ns, selector string) (object, controller *resource.Info, err error) {
objectList, err := getUnstructuredObjectBySelector(factory, ns, selector)
if err != nil {
return nil, nil, err
}
for _, info := range objectList {
if IsK8sService(info) {
continue
}
return getTopOwnerReference(factory, ns, fmt.Sprintf("%s/%s", info.Mapping.Resource.GroupResource().String(), info.Name))
}
return nil, nil, fmt.Errorf("can not find controller for %s", selector)
}