mirror of
https://github.com/kubenetworks/kubevpn.git
synced 2025-10-05 07:16:54 +08:00
feat: use dns query as port-forward health check (#570)
This commit is contained in:
@@ -4,8 +4,6 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync/atomic"
|
||||
|
||||
"net"
|
||||
"net/url"
|
||||
"reflect"
|
||||
@@ -14,13 +12,17 @@ import (
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/containernetworking/cni/pkg/types"
|
||||
"github.com/libp2p/go-netroute"
|
||||
miekgdns "github.com/miekg/dns"
|
||||
"github.com/pkg/errors"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"gvisor.dev/gvisor/pkg/tcpip"
|
||||
"gvisor.dev/gvisor/pkg/tcpip/stack"
|
||||
admissionv1 "k8s.io/api/admissionregistration/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
apinetworkingv1 "k8s.io/api/networking/v1"
|
||||
@@ -34,11 +36,13 @@ import (
|
||||
utilnet "k8s.io/apimachinery/pkg/util/net"
|
||||
"k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/cli-runtime/pkg/resource"
|
||||
runtimeresource "k8s.io/cli-runtime/pkg/resource"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
v2 "k8s.io/client-go/kubernetes/typed/networking/v1"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/util/retry"
|
||||
"k8s.io/kubectl/pkg/cmd/set"
|
||||
cmdutil "k8s.io/kubectl/pkg/cmd/util"
|
||||
"k8s.io/kubectl/pkg/polymorphichelpers"
|
||||
@@ -237,15 +241,29 @@ func (c *ConnectOptions) DoConnect(ctx context.Context, isLite bool, stopChan <-
|
||||
plog.G(ctx).Errorf("Add extra node IP failed: %v", err)
|
||||
return
|
||||
}
|
||||
var tcpForwardPort int
|
||||
tcpForwardPort, err = util.GetAvailableTCPPortOrDie()
|
||||
var rawTCPForwardPort, gvisorTCPForwardPort, gvisorUDPForwardPort int
|
||||
rawTCPForwardPort, err = util.GetAvailableTCPPortOrDie()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
gvisorTCPForwardPort, err = util.GetAvailableTCPPortOrDie()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
gvisorUDPForwardPort, err = util.GetAvailableTCPPortOrDie()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
plog.G(ctx).Info("Forwarding port...")
|
||||
portPair := []string{fmt.Sprintf("%d:10800", tcpForwardPort)}
|
||||
portPair := []string{
|
||||
fmt.Sprintf("%d:10800", rawTCPForwardPort),
|
||||
fmt.Sprintf("%d:10802", gvisorUDPForwardPort),
|
||||
}
|
||||
if c.Engine == config.EngineGvisor {
|
||||
portPair = []string{fmt.Sprintf("%d:10801", tcpForwardPort)}
|
||||
portPair = []string{
|
||||
fmt.Sprintf("%d:10801", gvisorTCPForwardPort),
|
||||
fmt.Sprintf("%d:10802", gvisorUDPForwardPort),
|
||||
}
|
||||
}
|
||||
if err = c.portForward(c.ctx, portPair); err != nil {
|
||||
return
|
||||
@@ -253,7 +271,10 @@ func (c *ConnectOptions) DoConnect(ctx context.Context, isLite bool, stopChan <-
|
||||
if util.IsWindows() {
|
||||
driver.InstallWireGuardTunDriver()
|
||||
}
|
||||
forward := fmt.Sprintf("tcp://127.0.0.1:%d", tcpForwardPort)
|
||||
forward := fmt.Sprintf("tcp://127.0.0.1:%d", rawTCPForwardPort)
|
||||
if c.Engine == config.EngineGvisor {
|
||||
forward = fmt.Sprintf("tcp://127.0.0.1:%d", gvisorTCPForwardPort)
|
||||
}
|
||||
if err = c.startLocalTunServer(c.ctx, forward, isLite); err != nil {
|
||||
plog.G(ctx).Errorf("Start local tun service failed: %v", err)
|
||||
return
|
||||
@@ -279,7 +300,9 @@ func (c *ConnectOptions) portForward(ctx context.Context, portPair []string) err
|
||||
defer firstCancelFunc()
|
||||
var errChan = make(chan error, 1)
|
||||
go func() {
|
||||
runtime.ErrorHandlers = runtime.ErrorHandlers[0:0]
|
||||
runtime.ErrorHandlers = []runtime.ErrorHandler{func(ctx context.Context, err error, msg string, keysAndValues ...interface{}) {
|
||||
plog.G(ctx).Error(err)
|
||||
}}
|
||||
var first = pointer.Bool(true)
|
||||
for ctx.Err() == nil {
|
||||
func() {
|
||||
@@ -288,7 +311,7 @@ func (c *ConnectOptions) portForward(ctx context.Context, portPair []string) err
|
||||
sortBy := func(pods []*v1.Pod) sort.Interface { return sort.Reverse(podutils.ActivePods(pods)) }
|
||||
label := fields.OneTermEqualSelector("app", config.ConfigMapPodTrafficManager).String()
|
||||
_, _, _ = polymorphichelpers.GetFirstPod(c.clientset.CoreV1(), c.Namespace, label, time.Second*5, sortBy)
|
||||
ctx2, cancelFunc2 := context.WithTimeout(ctx, time.Second*5)
|
||||
ctx2, cancelFunc2 := context.WithTimeout(ctx, time.Second*10)
|
||||
defer cancelFunc2()
|
||||
podList, err := c.GetRunningPodList(ctx2)
|
||||
if err != nil {
|
||||
@@ -307,18 +330,19 @@ func (c *ConnectOptions) portForward(ctx context.Context, portPair []string) err
|
||||
podName := pod.GetName()
|
||||
// try to detect pod is delete event, if pod is deleted, needs to redo port-forward
|
||||
go util.CheckPodStatus(childCtx, cancelFunc, podName, c.clientset.CoreV1().Pods(c.Namespace))
|
||||
go util.CheckPortStatus(childCtx, cancelFunc, readyChan, strings.Split(portPair[0], ":")[0])
|
||||
go c.heartbeats(childCtx, util.GetPodIP(pod)...)
|
||||
if *first {
|
||||
go func() {
|
||||
select {
|
||||
case <-readyChan:
|
||||
firstCancelFunc()
|
||||
case <-childCtx.Done():
|
||||
go healthCheck(childCtx, cancelFunc, readyChan, strings.Split(portPair[1], ":")[0], fmt.Sprintf("%s.%s", config.ConfigMapPodTrafficManager, c.Namespace))
|
||||
go func() {
|
||||
select {
|
||||
case <-readyChan:
|
||||
for _, pair := range portPair {
|
||||
ports := strings.Split(pair, ":")
|
||||
plog.G(ctx).Infof("Forwarding from %s -> %s", net.JoinHostPort("127.0.0.1", ports[0]), ports[1])
|
||||
}
|
||||
}()
|
||||
}
|
||||
var out = plog.G(ctx).Out
|
||||
firstCancelFunc()
|
||||
case <-childCtx.Done():
|
||||
}
|
||||
}()
|
||||
|
||||
err = util.PortForwardPod(
|
||||
c.config,
|
||||
c.restclient,
|
||||
@@ -327,8 +351,8 @@ func (c *ConnectOptions) portForward(ctx context.Context, portPair []string) err
|
||||
portPair,
|
||||
readyChan,
|
||||
childCtx.Done(),
|
||||
out,
|
||||
out,
|
||||
nil,
|
||||
plog.G(ctx).Out,
|
||||
)
|
||||
if *first {
|
||||
util.SafeWrite(errChan, err)
|
||||
@@ -1214,29 +1238,57 @@ func (c *ConnectOptions) ProxyResources() ProxyList {
|
||||
return c.proxyWorkloads
|
||||
}
|
||||
|
||||
func (c *ConnectOptions) heartbeats(ctx context.Context, ips ...string) {
|
||||
var dstIPv4, dstIPv6 net.IP
|
||||
for _, podIP := range ips {
|
||||
ip := net.ParseIP(podIP)
|
||||
if ip == nil {
|
||||
continue
|
||||
}
|
||||
if ip.To4() != nil {
|
||||
dstIPv4 = ip
|
||||
} else {
|
||||
dstIPv6 = ip
|
||||
}
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(config.KeepAliveTime)
|
||||
func healthCheck(ctx context.Context, cancelFunc context.CancelFunc, readyChan chan struct{}, localGvisorUDPPort string, domain string) {
|
||||
defer cancelFunc()
|
||||
ticker := time.NewTicker(time.Second * 60)
|
||||
defer ticker.Stop()
|
||||
|
||||
for ; ctx.Err() == nil; <-ticker.C {
|
||||
if dstIPv4 != nil && c.localTunIPv4 != nil {
|
||||
util.Ping(ctx, c.localTunIPv4.IP.String(), dstIPv4.String())
|
||||
select {
|
||||
case <-readyChan:
|
||||
case <-ticker.C:
|
||||
plog.G(ctx).Debugf("Wait port-forward to be ready timeout")
|
||||
return
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
||||
var healthChecker = func() error {
|
||||
conn, err := net.Dial("tcp", fmt.Sprintf(":%s", localGvisorUDPPort))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if dstIPv6 != nil && c.localTunIPv6 != nil {
|
||||
util.Ping(ctx, c.localTunIPv6.IP.String(), dstIPv6.String())
|
||||
defer conn.Close()
|
||||
err = util.WriteProxyInfo(conn, stack.TransportEndpointID{
|
||||
LocalPort: 53,
|
||||
LocalAddress: tcpip.AddrFrom4Slice(net.ParseIP("127.0.0.1").To4()),
|
||||
RemotePort: 0,
|
||||
RemoteAddress: tcpip.AddrFrom4Slice(net.IPv4zero.To4()),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
packetConn, _ := core.NewPacketConnOverTCP(ctx, conn)
|
||||
defer packetConn.Close()
|
||||
|
||||
msg := new(miekgdns.Msg)
|
||||
msg.SetQuestion(miekgdns.Fqdn(domain), miekgdns.TypeA)
|
||||
client := miekgdns.Client{Net: "udp", Timeout: time.Second * 10}
|
||||
_, _, err = client.ExchangeWithConnContext(ctx, msg, &miekgdns.Conn{Conn: packetConn})
|
||||
return err
|
||||
}
|
||||
|
||||
newTicker := time.NewTicker(time.Second * 10)
|
||||
defer newTicker.Stop()
|
||||
for ; ctx.Err() == nil; <-newTicker.C {
|
||||
err := retry.OnError(wait.Backoff{Duration: time.Second * 5, Steps: 4}, func(err error) bool {
|
||||
return err != nil
|
||||
}, func() error {
|
||||
return healthChecker()
|
||||
})
|
||||
if err != nil {
|
||||
plog.G(ctx).Errorf("Failed to query DNS: %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user