diff --git a/charts/kubevpn/templates/deployment.yaml b/charts/kubevpn/templates/deployment.yaml index f79ff362..65cff00f 100644 --- a/charts/kubevpn/templates/deployment.yaml +++ b/charts/kubevpn/templates/deployment.yaml @@ -45,9 +45,9 @@ spec: ip6tables -P FORWARD ACCEPT iptables -t nat -A POSTROUTING -s ${CIDR4} -o eth0 -j MASQUERADE ip6tables -t nat -A POSTROUTING -s ${CIDR6} -o eth0 -j MASQUERADE - kubevpn server -l "tcp://:10800" -l "tun://:8422?net=${TunIPv4}&net6=${TunIPv6}" -l "gtcp://:10801" + kubevpn server -l "tcp://:10800" -l "tun://:8422?net=${TunIPv4}&net6=${TunIPv6}" -l "gtcp://:10801" -l "gudp://:10802" {{- else }} - - kubevpn server -l "tcp://:10800" -l "gtcp://:10801" + - kubevpn server -l "tcp://:10800" -l "gtcp://:10801" -l "gudp://:10802" {{- end }} command: - /bin/sh diff --git a/pkg/core/gvisortcpforwarder.go b/pkg/core/gvisortcpforwarder.go index e109fdb7..364b9d44 100644 --- a/pkg/core/gvisortcpforwarder.go +++ b/pkg/core/gvisortcpforwarder.go @@ -1,16 +1,13 @@ package core import ( - "bytes" "context" - "encoding/binary" "fmt" "io" "net" "time" "github.com/pkg/errors" - "gvisor.dev/gvisor/pkg/tcpip" "gvisor.dev/gvisor/pkg/tcpip/adapters/gonet" "gvisor.dev/gvisor/pkg/tcpip/stack" "gvisor.dev/gvisor/pkg/tcpip/transport/tcp" @@ -70,65 +67,3 @@ func TCPForwarder(ctx context.Context, s *stack.Stack) func(stack.TransportEndpo } }).HandlePacket } - -func WriteProxyInfo(conn net.Conn, id stack.TransportEndpointID) error { - var b bytes.Buffer - buf := config.LPool.Get().([]byte)[:] - defer config.LPool.Put(buf[:]) - // local port - binary.BigEndian.PutUint16(buf, id.LocalPort) - b.Write(buf) - - // remote port - binary.BigEndian.PutUint16(buf, id.RemotePort) - b.Write(buf) - - // local address - b.WriteByte(byte(id.LocalAddress.Len())) - b.Write(id.LocalAddress.AsSlice()) - - // remote address - b.WriteByte(byte(id.RemoteAddress.Len())) - b.Write(id.RemoteAddress.AsSlice()) - _, err := b.WriteTo(conn) - return err -} - -// ParseProxyInfo parse proxy info [20]byte -func ParseProxyInfo(conn net.Conn) (id stack.TransportEndpointID, err error) { - var n int - var port = make([]byte, 2) - - // local port - if n, err = io.ReadFull(conn, port); err != nil || n != 2 { - return - } - id.LocalPort = binary.BigEndian.Uint16(port) - - // remote port - if n, err = io.ReadFull(conn, port); err != nil || n != 2 { - return - } - id.RemotePort = binary.BigEndian.Uint16(port) - - // local address - if n, err = io.ReadFull(conn, port[:1]); err != nil || n != 1 { - return - } - var localAddress = make([]byte, port[0]) - if n, err = io.ReadFull(conn, localAddress); err != nil || n != len(localAddress) { - return - } - id.LocalAddress = tcpip.AddrFromSlice(localAddress) - - // remote address - if n, err = io.ReadFull(conn, port[:1]); err != nil || n != 1 { - return - } - var remoteAddress = make([]byte, port[0]) - if n, err = io.ReadFull(conn, remoteAddress); err != nil || n != len(remoteAddress) { - return - } - id.RemoteAddress = tcpip.AddrFromSlice(remoteAddress) - return -} diff --git a/pkg/core/gvisorudphandler.go b/pkg/core/gvisorudphandler.go index c90ac9f3..60e44d3a 100644 --- a/pkg/core/gvisorudphandler.go +++ b/pkg/core/gvisorudphandler.go @@ -3,9 +3,12 @@ package core import ( "context" "fmt" + "io" "net" "time" + "github.com/pkg/errors" + "github.com/wencaiwulue/kubevpn/v2/pkg/config" plog "github.com/wencaiwulue/kubevpn/v2/pkg/log" "github.com/wencaiwulue/kubevpn/v2/pkg/util" @@ -19,14 +22,14 @@ func GvisorUDPHandler() Handler { func (h *gvisorUDPHandler) Handle(ctx context.Context, tcpConn net.Conn) { defer tcpConn.Close() - plog.G(ctx).Infof("[TUN-UDP] %s -> %s", tcpConn.RemoteAddr(), tcpConn.LocalAddr()) + plog.G(ctx).Debugf("[TUN-UDP] %s -> %s", tcpConn.RemoteAddr(), tcpConn.LocalAddr()) // 1, get proxy info - endpointID, err := ParseProxyInfo(tcpConn) + endpointID, err := util.ParseProxyInfo(tcpConn) if err != nil { plog.G(ctx).Errorf("[TUN-UDP] Failed to parse proxy info: %v", err) return } - plog.G(ctx).Infof("[TUN-UDP] LocalPort: %d, LocalAddress: %s, RemotePort: %d, RemoteAddress %s", + plog.G(ctx).Debugf("[TUN-UDP] LocalPort: %d, LocalAddress: %s, RemotePort: %d, RemoteAddress: %s", endpointID.LocalPort, endpointID.LocalAddress.String(), endpointID.RemotePort, endpointID.RemoteAddress.String(), ) // 2, dial proxy @@ -104,7 +107,7 @@ func GvisorUDPListener(addr string) (net.Listener, error) { func handle(ctx context.Context, tcpConn net.Conn, udpConn *net.UDPConn) { defer udpConn.Close() - plog.G(ctx).Infof("[TUN-UDP] %s <-> %s", tcpConn.RemoteAddr(), udpConn.LocalAddr()) + plog.G(ctx).Debugf("[TUN-UDP] %s <-> %s", tcpConn.RemoteAddr(), udpConn.LocalAddr()) errChan := make(chan error, 2) go func() { defer util.HandleCrash() @@ -114,34 +117,29 @@ func handle(ctx context.Context, tcpConn net.Conn, udpConn *net.UDPConn) { for ctx.Err() == nil { err := tcpConn.SetReadDeadline(time.Now().Add(time.Second * 30)) if err != nil { - plog.G(ctx).Errorf("[TUN-UDP] Failed to set read deadline: %v", err) - errChan <- err + errChan <- errors.WithMessage(err, "set read deadline failed") return } datagram, err := readDatagramPacket(tcpConn, buf) if err != nil { - plog.G(ctx).Errorf("[TUN-UDP] %s -> %s: %v", tcpConn.RemoteAddr(), udpConn.LocalAddr(), err) - errChan <- err + errChan <- errors.WithMessage(err, "read datagram packet failed") return } if datagram.DataLength == 0 { - plog.G(ctx).Errorf("[TUN-UDP] Length is zero") errChan <- fmt.Errorf("length of read packet is zero") return } err = udpConn.SetWriteDeadline(time.Now().Add(time.Second * 30)) if err != nil { - plog.G(ctx).Errorf("[TUN-UDP] Failed to set write deadline: %v", err) - errChan <- err + errChan <- errors.WithMessage(err, "set write deadline failed") return } if _, err = udpConn.Write(datagram.Data[:datagram.DataLength]); err != nil { - plog.G(ctx).Errorf("[TUN-UDP] %s -> %s : %s", tcpConn.RemoteAddr(), "localhost:8422", err) - errChan <- err + errChan <- errors.WithMessage(err, "write datagram packet failed") return } - plog.G(ctx).Infof("[TUN-UDP] %s >>> %s length: %d", tcpConn.RemoteAddr(), "localhost:8422", datagram.DataLength) + plog.G(ctx).Debugf("[TUN-UDP] %s >>> %s length: %d", tcpConn.RemoteAddr(), udpConn.RemoteAddr(), datagram.DataLength) } }() @@ -153,18 +151,15 @@ func handle(ctx context.Context, tcpConn net.Conn, udpConn *net.UDPConn) { for ctx.Err() == nil { err := udpConn.SetReadDeadline(time.Now().Add(time.Second * 30)) if err != nil { - plog.G(ctx).Errorf("[TUN-UDP] Failed to set read deadline failed: %v", err) - errChan <- err + errChan <- errors.WithMessage(err, "set read deadline failed") return } n, _, err := udpConn.ReadFrom(buf[:]) if err != nil { - plog.G(ctx).Errorf("[TUN-UDP] %s : %s", tcpConn.RemoteAddr(), err) - errChan <- err + errChan <- errors.WithMessage(err, "read datagram packet failed") return } if n == 0 { - plog.G(ctx).Errorf("[TUN-UDP] Length is zero") errChan <- fmt.Errorf("length of read packet is zero") return } @@ -172,23 +167,21 @@ func handle(ctx context.Context, tcpConn net.Conn, udpConn *net.UDPConn) { // pipe from peer to tunnel err = tcpConn.SetWriteDeadline(time.Now().Add(time.Second * 30)) if err != nil { - plog.G(ctx).Errorf("[TUN-UDP] Error: set write deadline failed: %v", err) - errChan <- err + errChan <- errors.WithMessage(err, "set write deadline failed") return } packet := newDatagramPacket(buf, n) if err = packet.Write(tcpConn); err != nil { - plog.G(ctx).Errorf("[TUN-UDP] Error: %s <- %s : %s", tcpConn.RemoteAddr(), tcpConn.LocalAddr(), err) errChan <- err return } - plog.G(ctx).Infof("[TUN-UDP] %s <<< %s length: %d", tcpConn.RemoteAddr(), tcpConn.LocalAddr(), len(packet.Data)) + plog.G(ctx).Debugf("[TUN-UDP] %s <<< %s length: %d", tcpConn.RemoteAddr(), tcpConn.LocalAddr(), packet.DataLength) } }() err := <-errChan - if err != nil { + if err != nil && !errors.Is(err, io.EOF) { plog.G(ctx).Errorf("[TUN-UDP] %v", err) } - plog.G(ctx).Infof("[TUN-UDP] %s >-< %s", tcpConn.RemoteAddr(), udpConn.LocalAddr()) + plog.G(ctx).Debugf("[TUN-UDP] %s >-< %s", tcpConn.RemoteAddr(), udpConn.LocalAddr()) return } diff --git a/pkg/core/packetconn.go b/pkg/core/packetconn.go new file mode 100644 index 00000000..766e0643 --- /dev/null +++ b/pkg/core/packetconn.go @@ -0,0 +1,69 @@ +package core + +import ( + "context" + "net" + + "github.com/wencaiwulue/kubevpn/v2/pkg/config" +) + +var _ net.PacketConn = (*PacketConnOverTCP)(nil) + +type PacketConnOverTCP struct { + // tcp connection + net.Conn + ctx context.Context +} + +func NewPacketConnOverTCP(ctx context.Context, conn net.Conn) (net.Conn, error) { + return &PacketConnOverTCP{ctx: ctx, Conn: conn}, nil +} + +func (c *PacketConnOverTCP) ReadFrom(b []byte) (int, net.Addr, error) { + select { + case <-c.ctx.Done(): + return 0, nil, c.ctx.Err() + default: + datagram, err := readDatagramPacket(c.Conn, b) + if err != nil { + return 0, nil, err + } + return int(datagram.DataLength), nil, nil + } +} + +func (c *PacketConnOverTCP) Read(b []byte) (int, error) { + n, _, err := c.ReadFrom(b) + return n, err +} + +func (c *PacketConnOverTCP) WriteTo(b []byte, _ net.Addr) (int, error) { + if len(b) == 0 { + return 0, nil + } + + buf := config.LPool.Get().([]byte)[:] + n := copy(buf, b) + defer config.LPool.Put(buf) + + packet := newDatagramPacket(buf, n) + if err := packet.Write(c.Conn); err != nil { + return 0, err + } + return len(b), nil +} + +func (c *PacketConnOverTCP) Write(b []byte) (int, error) { + n, err := c.WriteTo(b, nil) + return n, err +} + +func (c *PacketConnOverTCP) Close() error { + if cc, ok := c.Conn.(interface{ CloseRead() error }); ok { + _ = cc.CloseRead() + } + if cc, ok := c.Conn.(interface{ CloseWrite() error }); ok { + _ = cc.CloseWrite() + } + return c.Conn.Close() +} diff --git a/pkg/handler/connect.go b/pkg/handler/connect.go index e34ec4cc..0539c534 100644 --- a/pkg/handler/connect.go +++ b/pkg/handler/connect.go @@ -4,8 +4,6 @@ import ( "context" "encoding/json" "fmt" - "sync/atomic" - "net" "net/url" "reflect" @@ -14,13 +12,17 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/containernetworking/cni/pkg/types" "github.com/libp2p/go-netroute" + miekgdns "github.com/miekg/dns" "github.com/pkg/errors" log "github.com/sirupsen/logrus" "google.golang.org/grpc/metadata" + "gvisor.dev/gvisor/pkg/tcpip" + "gvisor.dev/gvisor/pkg/tcpip/stack" admissionv1 "k8s.io/api/admissionregistration/v1" v1 "k8s.io/api/core/v1" apinetworkingv1 "k8s.io/api/networking/v1" @@ -34,11 +36,13 @@ import ( utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/cli-runtime/pkg/resource" runtimeresource "k8s.io/cli-runtime/pkg/resource" "k8s.io/client-go/kubernetes" v2 "k8s.io/client-go/kubernetes/typed/networking/v1" "k8s.io/client-go/rest" + "k8s.io/client-go/util/retry" "k8s.io/kubectl/pkg/cmd/set" cmdutil "k8s.io/kubectl/pkg/cmd/util" "k8s.io/kubectl/pkg/polymorphichelpers" @@ -237,15 +241,29 @@ func (c *ConnectOptions) DoConnect(ctx context.Context, isLite bool, stopChan <- plog.G(ctx).Errorf("Add extra node IP failed: %v", err) return } - var tcpForwardPort int - tcpForwardPort, err = util.GetAvailableTCPPortOrDie() + var rawTCPForwardPort, gvisorTCPForwardPort, gvisorUDPForwardPort int + rawTCPForwardPort, err = util.GetAvailableTCPPortOrDie() + if err != nil { + return err + } + gvisorTCPForwardPort, err = util.GetAvailableTCPPortOrDie() + if err != nil { + return err + } + gvisorUDPForwardPort, err = util.GetAvailableTCPPortOrDie() if err != nil { return err } plog.G(ctx).Info("Forwarding port...") - portPair := []string{fmt.Sprintf("%d:10800", tcpForwardPort)} + portPair := []string{ + fmt.Sprintf("%d:10800", rawTCPForwardPort), + fmt.Sprintf("%d:10802", gvisorUDPForwardPort), + } if c.Engine == config.EngineGvisor { - portPair = []string{fmt.Sprintf("%d:10801", tcpForwardPort)} + portPair = []string{ + fmt.Sprintf("%d:10801", gvisorTCPForwardPort), + fmt.Sprintf("%d:10802", gvisorUDPForwardPort), + } } if err = c.portForward(c.ctx, portPair); err != nil { return @@ -253,7 +271,10 @@ func (c *ConnectOptions) DoConnect(ctx context.Context, isLite bool, stopChan <- if util.IsWindows() { driver.InstallWireGuardTunDriver() } - forward := fmt.Sprintf("tcp://127.0.0.1:%d", tcpForwardPort) + forward := fmt.Sprintf("tcp://127.0.0.1:%d", rawTCPForwardPort) + if c.Engine == config.EngineGvisor { + forward = fmt.Sprintf("tcp://127.0.0.1:%d", gvisorTCPForwardPort) + } if err = c.startLocalTunServer(c.ctx, forward, isLite); err != nil { plog.G(ctx).Errorf("Start local tun service failed: %v", err) return @@ -279,7 +300,9 @@ func (c *ConnectOptions) portForward(ctx context.Context, portPair []string) err defer firstCancelFunc() var errChan = make(chan error, 1) go func() { - runtime.ErrorHandlers = runtime.ErrorHandlers[0:0] + runtime.ErrorHandlers = []runtime.ErrorHandler{func(ctx context.Context, err error, msg string, keysAndValues ...interface{}) { + plog.G(ctx).Error(err) + }} var first = pointer.Bool(true) for ctx.Err() == nil { func() { @@ -288,7 +311,7 @@ func (c *ConnectOptions) portForward(ctx context.Context, portPair []string) err sortBy := func(pods []*v1.Pod) sort.Interface { return sort.Reverse(podutils.ActivePods(pods)) } label := fields.OneTermEqualSelector("app", config.ConfigMapPodTrafficManager).String() _, _, _ = polymorphichelpers.GetFirstPod(c.clientset.CoreV1(), c.Namespace, label, time.Second*5, sortBy) - ctx2, cancelFunc2 := context.WithTimeout(ctx, time.Second*5) + ctx2, cancelFunc2 := context.WithTimeout(ctx, time.Second*10) defer cancelFunc2() podList, err := c.GetRunningPodList(ctx2) if err != nil { @@ -307,18 +330,19 @@ func (c *ConnectOptions) portForward(ctx context.Context, portPair []string) err podName := pod.GetName() // try to detect pod is delete event, if pod is deleted, needs to redo port-forward go util.CheckPodStatus(childCtx, cancelFunc, podName, c.clientset.CoreV1().Pods(c.Namespace)) - go util.CheckPortStatus(childCtx, cancelFunc, readyChan, strings.Split(portPair[0], ":")[0]) - go c.heartbeats(childCtx, util.GetPodIP(pod)...) - if *first { - go func() { - select { - case <-readyChan: - firstCancelFunc() - case <-childCtx.Done(): + go healthCheck(childCtx, cancelFunc, readyChan, strings.Split(portPair[1], ":")[0], fmt.Sprintf("%s.%s", config.ConfigMapPodTrafficManager, c.Namespace)) + go func() { + select { + case <-readyChan: + for _, pair := range portPair { + ports := strings.Split(pair, ":") + plog.G(ctx).Infof("Forwarding from %s -> %s", net.JoinHostPort("127.0.0.1", ports[0]), ports[1]) } - }() - } - var out = plog.G(ctx).Out + firstCancelFunc() + case <-childCtx.Done(): + } + }() + err = util.PortForwardPod( c.config, c.restclient, @@ -327,8 +351,8 @@ func (c *ConnectOptions) portForward(ctx context.Context, portPair []string) err portPair, readyChan, childCtx.Done(), - out, - out, + nil, + plog.G(ctx).Out, ) if *first { util.SafeWrite(errChan, err) @@ -1214,29 +1238,57 @@ func (c *ConnectOptions) ProxyResources() ProxyList { return c.proxyWorkloads } -func (c *ConnectOptions) heartbeats(ctx context.Context, ips ...string) { - var dstIPv4, dstIPv6 net.IP - for _, podIP := range ips { - ip := net.ParseIP(podIP) - if ip == nil { - continue - } - if ip.To4() != nil { - dstIPv4 = ip - } else { - dstIPv6 = ip - } - } - - ticker := time.NewTicker(config.KeepAliveTime) +func healthCheck(ctx context.Context, cancelFunc context.CancelFunc, readyChan chan struct{}, localGvisorUDPPort string, domain string) { + defer cancelFunc() + ticker := time.NewTicker(time.Second * 60) defer ticker.Stop() - for ; ctx.Err() == nil; <-ticker.C { - if dstIPv4 != nil && c.localTunIPv4 != nil { - util.Ping(ctx, c.localTunIPv4.IP.String(), dstIPv4.String()) + select { + case <-readyChan: + case <-ticker.C: + plog.G(ctx).Debugf("Wait port-forward to be ready timeout") + return + case <-ctx.Done(): + return + } + + var healthChecker = func() error { + conn, err := net.Dial("tcp", fmt.Sprintf(":%s", localGvisorUDPPort)) + if err != nil { + return err } - if dstIPv6 != nil && c.localTunIPv6 != nil { - util.Ping(ctx, c.localTunIPv6.IP.String(), dstIPv6.String()) + defer conn.Close() + err = util.WriteProxyInfo(conn, stack.TransportEndpointID{ + LocalPort: 53, + LocalAddress: tcpip.AddrFrom4Slice(net.ParseIP("127.0.0.1").To4()), + RemotePort: 0, + RemoteAddress: tcpip.AddrFrom4Slice(net.IPv4zero.To4()), + }) + if err != nil { + return err + } + + packetConn, _ := core.NewPacketConnOverTCP(ctx, conn) + defer packetConn.Close() + + msg := new(miekgdns.Msg) + msg.SetQuestion(miekgdns.Fqdn(domain), miekgdns.TypeA) + client := miekgdns.Client{Net: "udp", Timeout: time.Second * 10} + _, _, err = client.ExchangeWithConnContext(ctx, msg, &miekgdns.Conn{Conn: packetConn}) + return err + } + + newTicker := time.NewTicker(time.Second * 10) + defer newTicker.Stop() + for ; ctx.Err() == nil; <-newTicker.C { + err := retry.OnError(wait.Backoff{Duration: time.Second * 5, Steps: 4}, func(err error) bool { + return err != nil + }, func() error { + return healthChecker() + }) + if err != nil { + plog.G(ctx).Errorf("Failed to query DNS: %v", err) + return } } } diff --git a/pkg/handler/remote.go b/pkg/handler/remote.go index ea7b19f7..18239a11 100644 --- a/pkg/handler/remote.go +++ b/pkg/handler/remote.go @@ -363,7 +363,7 @@ func genDeploySpec(namespace string, udp8422 string, tcp10800 string, tcp9002 st Args: []string{util.If( gvisor, ` -kubevpn server -l "tcp://:10800" -l "gtcp://:10801"`, +kubevpn server -l "tcp://:10800" -l "gtcp://:10801" -l "gudp://:10802"`, ` echo 1 > /proc/sys/net/ipv4/ip_forward echo 0 > /proc/sys/net/ipv6/conf/all/disable_ipv6 @@ -375,7 +375,7 @@ iptables -P FORWARD ACCEPT ip6tables -P FORWARD ACCEPT iptables -t nat -A POSTROUTING -s ${CIDR4} -o eth0 -j MASQUERADE ip6tables -t nat -A POSTROUTING -s ${CIDR6} -o eth0 -j MASQUERADE -kubevpn server -l "tcp://:10800" -l "tun://:8422?net=${TunIPv4}&net6=${TunIPv6}" -l "gtcp://:10801"`, +kubevpn server -l "tcp://:10800" -l "tun://:8422?net=${TunIPv4}&net6=${TunIPv6}" -l "gtcp://:10801" -l "gudp://:10802"`, )}, EnvFrom: []v1.EnvFromSource{{ SecretRef: &v1.SecretEnvSource{ diff --git a/pkg/util/gvisor.go b/pkg/util/gvisor.go new file mode 100644 index 00000000..071057eb --- /dev/null +++ b/pkg/util/gvisor.go @@ -0,0 +1,75 @@ +package util + +import ( + "bytes" + "encoding/binary" + "io" + "net" + + "gvisor.dev/gvisor/pkg/tcpip" + "gvisor.dev/gvisor/pkg/tcpip/stack" + + "github.com/wencaiwulue/kubevpn/v2/pkg/config" +) + +func WriteProxyInfo(conn net.Conn, id stack.TransportEndpointID) error { + var b bytes.Buffer + buf := config.LPool.Get().([]byte)[:] + defer config.LPool.Put(buf[:]) + // local port + binary.BigEndian.PutUint16(buf, id.LocalPort) + b.Write(buf[:2]) + + // remote port + binary.BigEndian.PutUint16(buf, id.RemotePort) + b.Write(buf[:2]) + + // local address + b.WriteByte(byte(id.LocalAddress.Len())) + b.Write(id.LocalAddress.AsSlice()) + + // remote address + b.WriteByte(byte(id.RemoteAddress.Len())) + b.Write(id.RemoteAddress.AsSlice()) + _, err := b.WriteTo(conn) + return err +} + +// ParseProxyInfo parse proxy info [20]byte +func ParseProxyInfo(conn net.Conn) (id stack.TransportEndpointID, err error) { + var n int + var port = make([]byte, 2) + + // local port + if n, err = io.ReadFull(conn, port); err != nil || n != 2 { + return + } + id.LocalPort = binary.BigEndian.Uint16(port) + + // remote port + if n, err = io.ReadFull(conn, port); err != nil || n != 2 { + return + } + id.RemotePort = binary.BigEndian.Uint16(port) + + // local address + if n, err = io.ReadFull(conn, port[:1]); err != nil || n != 1 { + return + } + var localAddress = make([]byte, port[0]) + if n, err = io.ReadFull(conn, localAddress); err != nil || n != len(localAddress) { + return + } + id.LocalAddress = tcpip.AddrFromSlice(localAddress) + + // remote address + if n, err = io.ReadFull(conn, port[:1]); err != nil || n != 1 { + return + } + var remoteAddress = make([]byte, port[0]) + if n, err = io.ReadFull(conn, remoteAddress); err != nil || n != len(remoteAddress) { + return + } + id.RemoteAddress = tcpip.AddrFromSlice(remoteAddress) + return +} diff --git a/pkg/util/pod.go b/pkg/util/pod.go index 2bcf9a1f..c8470965 100644 --- a/pkg/util/pod.go +++ b/pkg/util/pod.go @@ -335,7 +335,7 @@ func CheckPodStatus(ctx context.Context, cancelFunc context.CancelFunc, podName return err != nil }, func() error { - ctx1, cancelFunc1 := context.WithTimeout(ctx, time.Second*5) + ctx1, cancelFunc1 := context.WithTimeout(ctx, time.Second*10) defer cancelFunc1() _, err := podInterface.Get(ctx1, podName, v1.GetOptions{}) return err @@ -382,32 +382,6 @@ func CheckPodStatus(ctx context.Context, cancelFunc context.CancelFunc, podName } } -func CheckPortStatus(ctx context.Context, cancelFunc context.CancelFunc, readyChan chan struct{}, localRandomTCPPort string) { - defer cancelFunc() - ticker := time.NewTicker(time.Second * 60) - defer ticker.Stop() - - select { - case <-readyChan: - case <-ticker.C: - plog.G(ctx).Debugf("Wait port-forward to be ready timeout") - return - case <-ctx.Done(): - return - } - - for ctx.Err() == nil { - var lc net.ListenConfig - conn, err := lc.Listen(ctx, "tcp", net.JoinHostPort("127.0.0.1", localRandomTCPPort)) - if err == nil { - _ = conn.Close() - plog.G(ctx).Debugf("Local port: %s is free", localRandomTCPPort) - return - } - time.Sleep(time.Second * 1) - } -} - func Rollback(f util.Factory, ns, workload string) { r := f.NewBuilder(). WithScheme(scheme2.Scheme, scheme2.Scheme.PrioritizedVersionsAllGroups()...).