fix: disconnect if operation connect has cancelled (#279)

This commit is contained in:
naison
2024-06-29 11:56:21 +08:00
committed by GitHub
parent 5cd7ef4a0a
commit deb4ec98f5
14 changed files with 76 additions and 40 deletions

View File

@@ -125,10 +125,10 @@ type cloneWarp struct {
} }
func (r *cloneWarp) Write(p []byte) (n int, err error) { func (r *cloneWarp) Write(p []byte) (n int, err error) {
err = r.server.Send(&rpc.CloneResponse{ _ = r.server.Send(&rpc.CloneResponse{
Message: string(p), Message: string(p),
}) })
return len(p), err return len(p), nil
} }
func newCloneWarp(server rpc.Daemon_CloneServer) io.Writer { func newCloneWarp(server rpc.Daemon_CloneServer) io.Writer {

View File

@@ -95,6 +95,9 @@ func (svr *Server) ConnectFork(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectF
return err return err
} }
if resp.Context().Err() != nil {
return resp.Context().Err()
}
svr.secondaryConnect = append(svr.secondaryConnect, connect) svr.secondaryConnect = append(svr.secondaryConnect, connect)
return nil return nil
} }
@@ -181,6 +184,9 @@ func (svr *Server) redirectConnectForkToSudoDaemon(req *rpc.ConnectRequest, resp
} }
} }
if resp.Context().Err() != nil {
return resp.Context().Err()
}
svr.secondaryConnect = append(svr.secondaryConnect, connect) svr.secondaryConnect = append(svr.secondaryConnect, connect)
return nil return nil
} }
@@ -190,10 +196,10 @@ type connectForkWarp struct {
} }
func (r *connectForkWarp) Write(p []byte) (n int, err error) { func (r *connectForkWarp) Write(p []byte) (n int, err error) {
err = r.server.Send(&rpc.ConnectResponse{ _ = r.server.Send(&rpc.ConnectResponse{
Message: string(p), Message: string(p),
}) })
return len(p), err return len(p), nil
} }
func newConnectForkWarp(server rpc.Daemon_ConnectForkServer) io.Writer { func newConnectForkWarp(server rpc.Daemon_ConnectForkServer) io.Writer {

View File

@@ -39,7 +39,7 @@ func (svr *Server) Connect(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectServe
return status.Error(codes.AlreadyExists, s) return status.Error(codes.AlreadyExists, s)
} }
defer func() { defer func() {
if e != nil { if e != nil || ctx.Err() != nil {
if svr.connect != nil { if svr.connect != nil {
svr.connect.Cleanup() svr.connect.Cleanup()
svr.connect = nil svr.connect = nil
@@ -201,6 +201,9 @@ func (svr *Server) redirectToSudoDaemon(req *rpc.ConnectRequest, resp rpc.Daemon
} }
} }
if resp.Context().Err() != nil {
return resp.Context().Err()
}
svr.t = time.Now() svr.t = time.Now()
svr.connect = connect svr.connect = connect
@@ -212,10 +215,10 @@ type warp struct {
} }
func (r *warp) Write(p []byte) (n int, err error) { func (r *warp) Write(p []byte) (n int, err error) {
err = r.server.Send(&rpc.ConnectResponse{ _ = r.server.Send(&rpc.ConnectResponse{
Message: string(p), Message: string(p),
}) })
return len(p), err return len(p), nil
} }
func newWarp(server rpc.Daemon_ConnectServer) io.Writer { func newWarp(server rpc.Daemon_ConnectServer) io.Writer {

View File

@@ -198,10 +198,10 @@ type disconnectWarp struct {
} }
func (r *disconnectWarp) Write(p []byte) (n int, err error) { func (r *disconnectWarp) Write(p []byte) (n int, err error) {
err = r.server.Send(&rpc.DisconnectResponse{ _ = r.server.Send(&rpc.DisconnectResponse{
Message: string(p), Message: string(p),
}) })
return len(p), err return len(p), nil
} }
func newDisconnectWarp(server rpc.Daemon_DisconnectServer) io.Writer { func newDisconnectWarp(server rpc.Daemon_DisconnectServer) io.Writer {

View File

@@ -29,13 +29,11 @@ func (svr *Server) Leave(req *rpc.LeaveRequest, resp rpc.Daemon_LeaveServer) err
v4, _ := svr.connect.GetLocalTunIP() v4, _ := svr.connect.GetLocalTunIP()
for _, workload := range req.GetWorkloads() { for _, workload := range req.GetWorkloads() {
// add rollback func to remove envoy config // add rollback func to remove envoy config
log.Infof("leave workload %s", workload)
err := handler.UnPatchContainer(factory, maps, namespace, workload, v4) err := handler.UnPatchContainer(factory, maps, namespace, workload, v4)
if err != nil { if err != nil {
log.Errorf("leave workload %s failed: %v", workload, err) log.Errorf("leave workload %s failed: %v", workload, err)
continue continue
} }
log.Infof("leave workload %s successfully", workload)
} }
return nil return nil
} }
@@ -45,10 +43,10 @@ type leaveWarp struct {
} }
func (r *leaveWarp) Write(p []byte) (n int, err error) { func (r *leaveWarp) Write(p []byte) (n int, err error) {
err = r.server.Send(&rpc.LeaveResponse{ _ = r.server.Send(&rpc.LeaveResponse{
Message: string(p), Message: string(p),
}) })
return len(p), err return len(p), nil
} }
func newLeaveWarp(server rpc.Daemon_LeaveServer) io.Writer { func newLeaveWarp(server rpc.Daemon_LeaveServer) io.Writer {

View File

@@ -1,6 +1,7 @@
package action package action
import ( import (
"context"
"fmt" "fmt"
"io" "io"
@@ -21,7 +22,7 @@ import (
// 2. if already connect to cluster // 2. if already connect to cluster
// 2.1 disconnect from cluster // 2.1 disconnect from cluster
// 2.2 same as step 1 // 2.2 same as step 1
func (svr *Server) Proxy(req *rpc.ConnectRequest, resp rpc.Daemon_ProxyServer) error { func (svr *Server) Proxy(req *rpc.ConnectRequest, resp rpc.Daemon_ProxyServer) (e error) {
out := io.MultiWriter(newProxyWarp(resp), svr.LogFile) out := io.MultiWriter(newProxyWarp(resp), svr.LogFile)
log.SetOutput(out) log.SetOutput(out)
defer func() { defer func() {
@@ -66,6 +67,12 @@ func (svr *Server) Proxy(req *rpc.ConnectRequest, resp rpc.Daemon_ProxyServer) e
return err return err
} }
defer func() {
if e != nil && svr.connect != nil {
_ = svr.connect.LeaveProxyResources(context.Background())
}
}()
daemonClient := svr.GetClient(false) daemonClient := svr.GetClient(false)
if daemonClient == nil { if daemonClient == nil {
return fmt.Errorf("daemon is not avaliable") return fmt.Errorf("daemon is not avaliable")
@@ -147,10 +154,10 @@ type proxyWarp struct {
} }
func (r *proxyWarp) Write(p []byte) (n int, err error) { func (r *proxyWarp) Write(p []byte) (n int, err error) {
err = r.server.Send(&rpc.ConnectResponse{ _ = r.server.Send(&rpc.ConnectResponse{
Message: string(p), Message: string(p),
}) })
return len(p), err return len(p), nil
} }
func newProxyWarp(server rpc.Daemon_ProxyServer) io.Writer { func newProxyWarp(server rpc.Daemon_ProxyServer) io.Writer {

View File

@@ -56,10 +56,10 @@ func (r *quitWarp) Write(p []byte) (n int, err error) {
if r.server == nil { if r.server == nil {
return len(p), nil return len(p), nil
} }
err = r.server.Send(&rpc.QuitResponse{ _ = r.server.Send(&rpc.QuitResponse{
Message: string(p), Message: string(p),
}) })
return len(p), err return len(p), nil
} }
func newQuitWarp(server rpc.Daemon_QuitServer) io.Writer { func newQuitWarp(server rpc.Daemon_QuitServer) io.Writer {

View File

@@ -32,10 +32,10 @@ type removeWarp struct {
} }
func (r *removeWarp) Write(p []byte) (n int, err error) { func (r *removeWarp) Write(p []byte) (n int, err error) {
err = r.server.Send(&rpc.RemoveResponse{ _ = r.server.Send(&rpc.RemoveResponse{
Message: string(p), Message: string(p),
}) })
return len(p), err return len(p), nil
} }
func newRemoveWarp(server rpc.Daemon_RemoveServer) io.Writer { func newRemoveWarp(server rpc.Daemon_RemoveServer) io.Writer {

View File

@@ -58,10 +58,10 @@ type resetWarp struct {
} }
func (r *resetWarp) Write(p []byte) (n int, err error) { func (r *resetWarp) Write(p []byte) (n int, err error) {
err = r.server.Send(&rpc.ResetResponse{ _ = r.server.Send(&rpc.ResetResponse{
Message: string(p), Message: string(p),
}) })
return len(p), err return len(p), nil
} }
func newResetWarp(server rpc.Daemon_ResetServer) io.Writer { func newResetWarp(server rpc.Daemon_ResetServer) io.Writer {

View File

@@ -34,10 +34,10 @@ type stopWarp struct {
} }
func (r *stopWarp) Write(p []byte) (n int, err error) { func (r *stopWarp) Write(p []byte) (n int, err error) {
err = r.server.Send(&rpc.QuitResponse{ _ = r.server.Send(&rpc.QuitResponse{
Message: string(p), Message: string(p),
}) })
return len(p), err return len(p), nil
} }
func newStopWarp(server rpc.Daemon_QuitServer) io.Writer { func newStopWarp(server rpc.Daemon_QuitServer) io.Writer {

View File

@@ -45,9 +45,12 @@ func (c *Config) usingResolver(ctx context.Context) {
path := filepath.Join("/", "etc", "resolver") path := filepath.Join("/", "etc", "resolver")
if _, err := os.Stat(path); os.IsNotExist(err) { if _, err := os.Stat(path); os.IsNotExist(err) {
if err = os.MkdirAll(path, fs.ModePerm); err != nil { if err = os.MkdirAll(path, 0755); err != nil {
log.Errorf("create resolver error: %v", err) log.Errorf("create resolver error: %v", err)
} }
if err = os.Chmod(path, 0755); err != nil {
log.Errorf("chmod resolver error: %v", err)
}
} }
newConfig := miekgdns.ClientConfig{ newConfig := miekgdns.ClientConfig{
Servers: clientConfig.Servers, Servers: clientConfig.Servers,

View File

@@ -86,10 +86,10 @@ func (c *ConnectOptions) Cleanup() {
c.cancel() c.cancel()
} }
if c.dnsConfig != nil { if c.dnsConfig != nil {
log.Infof("clean up dns") log.Infof("cleanup dns")
c.dnsConfig.CancelDNS() c.dnsConfig.CancelDNS()
} }
log.Info("clean up successfully") log.Info("cleanup successfully")
} }
// vendor/k8s.io/kubectl/pkg/polymorphichelpers/rollback.go:99 // vendor/k8s.io/kubectl/pkg/polymorphichelpers/rollback.go:99

View File

@@ -157,12 +157,18 @@ func UnPatchContainer(factory cmdutil.Factory, mapInterface v12.ConfigMapInterfa
nodeID := fmt.Sprintf("%s.%s", object.Mapping.Resource.GroupResource().String(), object.Name) nodeID := fmt.Sprintf("%s.%s", object.Mapping.Resource.GroupResource().String(), object.Name)
var empty bool var empty, found bool
empty, err = removeEnvoyConfig(mapInterface, nodeID, localTunIPv4) empty, found, err = removeEnvoyConfig(mapInterface, nodeID, localTunIPv4)
if err != nil { if err != nil {
log.Errorf("remove envoy config error: %v", err) log.Errorf("remove envoy config error: %v", err)
return err return err
} }
if !found {
log.Infof("not proxy resource %s", workload)
return nil
}
log.Infof("leave workload %s", workload)
mesh.RemoveContainers(templateSpec) mesh.RemoveContainers(templateSpec)
if u.GetAnnotations() != nil && u.GetAnnotations()[config.KubeVPNRestorePatchKey] != "" { if u.GetAnnotations() != nil && u.GetAnnotations()[config.KubeVPNRestorePatchKey] != "" {
@@ -292,33 +298,37 @@ func addVirtualRule(v []*controlplane.Virtual, nodeID string, port []v1.Containe
return v return v
} }
func removeEnvoyConfig(mapInterface v12.ConfigMapInterface, nodeID string, localTunIPv4 string) (bool, error) { func removeEnvoyConfig(mapInterface v12.ConfigMapInterface, nodeID string, localTunIPv4 string) (empty bool, found bool, err error) {
configMap, err := mapInterface.Get(context.Background(), config.ConfigMapPodTrafficManager, metav1.GetOptions{}) configMap, err := mapInterface.Get(context.Background(), config.ConfigMapPodTrafficManager, metav1.GetOptions{})
if k8serrors.IsNotFound(err) { if k8serrors.IsNotFound(err) {
return true, nil return true, false, nil
} }
if err != nil { if err != nil {
return false, err return false, false, err
} }
str, ok := configMap.Data[config.KeyEnvoy] str, ok := configMap.Data[config.KeyEnvoy]
if !ok { if !ok {
return false, errors.New("can not found value for key: envoy-config.yaml") return false, false, errors.New("can not found value for key: envoy-config.yaml")
} }
var v []*controlplane.Virtual var v []*controlplane.Virtual
if err = yaml.Unmarshal([]byte(str), &v); err != nil { if err = yaml.Unmarshal([]byte(str), &v); err != nil {
return false, err return false, false, err
} }
for _, virtual := range v { for _, virtual := range v {
if nodeID == virtual.Uid { if nodeID == virtual.Uid {
for i := 0; i < len(virtual.Rules); i++ { for i := 0; i < len(virtual.Rules); i++ {
if virtual.Rules[i].LocalTunIPv4 == localTunIPv4 { if virtual.Rules[i].LocalTunIPv4 == localTunIPv4 {
found = true
virtual.Rules = append(virtual.Rules[:i], virtual.Rules[i+1:]...) virtual.Rules = append(virtual.Rules[:i], virtual.Rules[i+1:]...)
i-- i--
} }
} }
} }
} }
var empty bool if !found {
return false, false, nil
}
// remove default // remove default
for i := 0; i < len(v); i++ { for i := 0; i < len(v); i++ {
if nodeID == v[i].Uid && len(v[i].Rules) == 0 { if nodeID == v[i].Uid && len(v[i].Rules) == 0 {
@@ -330,11 +340,11 @@ func removeEnvoyConfig(mapInterface v12.ConfigMapInterface, nodeID string, local
var bytes []byte var bytes []byte
bytes, err = yaml.Marshal(v) bytes, err = yaml.Marshal(v)
if err != nil { if err != nil {
return false, err return false, found, err
} }
configMap.Data[config.KeyEnvoy] = string(bytes) configMap.Data[config.KeyEnvoy] = string(bytes)
_, err = mapInterface.Update(context.Background(), configMap, metav1.UpdateOptions{}) _, err = mapInterface.Update(context.Background(), configMap, metav1.UpdateOptions{})
return empty, err return empty, found, err
} }
func contains(a map[string]string, sub map[string]string) bool { func contains(a map[string]string, sub map[string]string) bool {

View File

@@ -75,16 +75,25 @@ func (c *ConnectOptions) LeaveProxyResources(ctx context.Context) (err error) {
} }
v4, _ := c.GetLocalTunIP() v4, _ := c.GetLocalTunIP()
for _, virtual := range v { for _, virtual := range v {
var found bool
for _, rule := range virtual.Rules {
if rule.LocalTunIPv4 == v4 {
found = true
break
}
}
if !found {
continue
}
// deployments.apps.ry-server --> deployments.apps/ry-server // deployments.apps.ry-server --> deployments.apps/ry-server
lastIndex := strings.LastIndex(virtual.Uid, ".") lastIndex := strings.LastIndex(virtual.Uid, ".")
uid := virtual.Uid[:lastIndex] + "/" + virtual.Uid[lastIndex+1:] uid := virtual.Uid[:lastIndex] + "/" + virtual.Uid[lastIndex+1:]
log.Infof("leave resource: %s", uid)
err = UnPatchContainer(c.factory, c.clientset.CoreV1().ConfigMaps(c.Namespace), c.Namespace, uid, v4) err = UnPatchContainer(c.factory, c.clientset.CoreV1().ConfigMaps(c.Namespace), c.Namespace, uid, v4)
if err != nil { if err != nil {
log.Errorf("unpatch container error: %v", err) log.Errorf("leave workload %s failed: %v", uid, err)
continue continue
} }
log.Infof("leave resource: %s successfully", uid)
} }
return err return err
} }