From b0ed57794f15d71c0886b86d965252051aec4d30 Mon Sep 17 00:00:00 2001 From: fengcaiwen Date: Mon, 17 Jul 2023 12:54:00 +0800 Subject: [PATCH] fix: fix cidr-domain not works bug --- pkg/dns/dns.go | 22 ++++----- pkg/handler/connect.go | 86 +++++++++++++++++++++++++++++------- pkg/handler/function_test.go | 16 +++++++ 3 files changed, 97 insertions(+), 27 deletions(-) diff --git a/pkg/dns/dns.go b/pkg/dns/dns.go index 98fbfa7f..a2829127 100644 --- a/pkg/dns/dns.go +++ b/pkg/dns/dns.go @@ -82,14 +82,14 @@ func GetDNSIPFromDnsPod(clientset *kubernetes.Clientset) (ips []string, err erro return } -func AddServiceNameToHosts(ctx context.Context, serviceInterface v13.ServiceInterface) { +func AddServiceNameToHosts(ctx context.Context, serviceInterface v13.ServiceInterface, hosts ...Entry) { rateLimiter := flowcontrol.NewTokenBucketRateLimiter(0.2, 1) defer rateLimiter.Stop() var last string serviceList, err := serviceInterface.List(ctx, v1.ListOptions{}) if err == nil && len(serviceList.Items) != 0 { - entry := generateHostsEntry(serviceList.Items) + entry := generateHostsEntry(serviceList.Items, hosts) if err = updateHosts(entry); err == nil { last = entry } @@ -126,7 +126,7 @@ func AddServiceNameToHosts(ctx context.Context, serviceInterface v13.ServiceInte if err != nil { return } - entry := generateHostsEntry(list.Items) + entry := generateHostsEntry(list.Items, hosts) if entry == last { continue } @@ -180,15 +180,14 @@ func updateHosts(str string) error { return os.WriteFile(path, []byte(strings.Join(strList, "\n")), 0644) } -func generateHostsEntry(list []v12.Service) string { - type entry struct { - IP string - Domain string - } +type Entry struct { + IP string + Domain string +} +func generateHostsEntry(list []v12.Service, hosts []Entry) string { const ServiceKubernetes = "kubernetes" - - var entryList []entry + var entryList []Entry for _, item := range list { if strings.EqualFold(item.Name, ServiceKubernetes) { @@ -201,7 +200,7 @@ func generateHostsEntry(list []v12.Service) string { if net.ParseIP(ip) == nil || domain == "" { continue } - entryList = append(entryList, entry{IP: ip, Domain: domain}) + entryList = append(entryList, Entry{IP: ip, Domain: domain}) } } } @@ -211,6 +210,7 @@ func generateHostsEntry(list []v12.Service) string { } return entryList[i].Domain > entryList[j].Domain }) + entryList = append(entryList, hosts...) var sb = new(bytes.Buffer) w := tabwriter.NewWriter(sb, 1, 1, 1, ' ', 0) diff --git a/pkg/handler/connect.go b/pkg/handler/connect.go index a8a6d18a..001848d0 100644 --- a/pkg/handler/connect.go +++ b/pkg/handler/connect.go @@ -4,12 +4,14 @@ import ( "context" "encoding/json" "fmt" + "io" "math" "math/rand" "net" "net/netip" "net/url" "os" + "os/exec" "path/filepath" "strconv" "strings" @@ -83,6 +85,7 @@ type ConnectOptions struct { localTunIPv6 *net.IPNet apiServerIPs []net.IP + extraHost []dns.Entry } func (c *ConnectOptions) createRemoteInboundPod(ctx context.Context) (err error) { @@ -166,10 +169,10 @@ func (c *ConnectOptions) DoConnect() (err error) { return } c.deleteFirewallRule(ctx) - if err = c.setupDNS(); err != nil { + if err = c.addExtraRoute(ctx); err != nil { return } - if err = c.addExtraRoute(ctx); err != nil { + if err = c.setupDNS(); err != nil { return } go c.heartbeats() @@ -537,7 +540,7 @@ func (c *ConnectOptions) setupDNS() error { return err } // dump service in current namespace for support DNS resolve service:port - go dns.AddServiceNameToHosts(ctx, c.clientset.CoreV1().Services(c.Namespace)) + go dns.AddServiceNameToHosts(ctx, c.clientset.CoreV1().Services(c.Namespace), c.extraHost...) return nil } @@ -895,30 +898,41 @@ func (c *ConnectOptions) getCIDR(ctx context.Context) (err error) { return } -func (c *ConnectOptions) addExtraRoute(ctx context.Context) (err error) { +func (c *ConnectOptions) addExtraRoute(ctx context.Context) error { if len(c.ExtraDomain) == 0 { - return + return nil } - var ips []string - ips, err = dns.GetDNSIPFromDnsPod(c.clientset) + ips, err := dns.GetDNSIPFromDnsPod(c.clientset) if err != nil { - return + return err } if len(ips) == 0 { err = fmt.Errorf("can't found any dns server") - return + return err } + ctx2, cancelFunc := context.WithTimeout(ctx, time.Second*10) + wait.UntilWithContext(ctx2, func(context.Context) { + for _, ip := range ips { + pong, err2 := util.Ping(ip) + if err2 == nil && pong { + ips = []string{ip} + cancelFunc() + return + } + } + }, time.Millisecond*50) + var r routing.Router r, err = netroute.New() if err != nil { - return + return err } var tunIface *net.Interface tunIface, err = tun.GetInterface() if err != nil { - return + return err } addRouteFunc := func(resource, ip string) { @@ -942,11 +956,38 @@ func (c *ConnectOptions) addExtraRoute(ctx context.Context) (err error) { } } - client := &miekgdns.Client{Net: "udp", SingleInflight: true, DialTimeout: time.Second * 30} + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + go func() { + for _, domain := range c.ExtraDomain { + domain := domain + go func() { + for ; true; <-ticker.C { + func() { + // if use nslookup to query dns at first, it will speed up mikdns query process + subCtx, c2 := context.WithTimeout(ctx, time.Second*2) + defer c2() + cmd := exec.CommandContext(subCtx, "nslookup", domain, ips[0]) + cmd.Stderr = io.Discard + cmd.Stdout = io.Discard + _ = cmd.Start() + _ = cmd.Wait() + }() + } + }() + } + }() + + client := &miekgdns.Client{Net: "udp", Timeout: time.Second * 2, SingleInflight: true} for _, domain := range c.ExtraDomain { - for _, qType := range []uint16{miekgdns.TypeA, miekgdns.TypeAAAA} { + var success = false + for _, qType := range []uint16{miekgdns.TypeA /*, miekgdns.TypeAAAA*/} { + var iErr = errors.New("No retry") err = retry.OnError( - retry.DefaultRetry, + wait.Backoff{ + Steps: 1000, + Duration: time.Millisecond * 30, + }, func(err error) bool { return err != nil }, @@ -966,22 +1007,35 @@ func (c *ConnectOptions) addExtraRoute(ctx context.Context) (err error) { if err != nil { return err } + if len(answer.Answer) == 0 { + return iErr + } for _, rr := range answer.Answer { switch a := rr.(type) { case *miekgdns.A: addRouteFunc(domain, a.A.String()) + c.extraHost = append(c.extraHost, dns.Entry{IP: a.A.String(), Domain: domain}) + success = true case *miekgdns.AAAA: addRouteFunc(domain, a.AAAA.String()) + c.extraHost = append(c.extraHost, dns.Entry{IP: a.AAAA.String(), Domain: domain}) + success = true } } return nil }) - if err != nil { + if err != nil && err != iErr { return err } + if success { + break + } + } + if !success { + return fmt.Errorf("failed to resolve dns for domain %s", domain) } } - return + return nil } func (c *ConnectOptions) GetKubeconfigPath() (string, error) { diff --git a/pkg/handler/function_test.go b/pkg/handler/function_test.go index 294f12eb..d83175e2 100644 --- a/pkg/handler/function_test.go +++ b/pkg/handler/function_test.go @@ -345,3 +345,19 @@ func init() { log.Fatal(err) } } + +func TestWaitBackoff(t *testing.T) { + var last = time.Now() + _ = retry.OnError( + wait.Backoff{ + Steps: 10, + Duration: time.Millisecond * 50, + }, func(err error) bool { + return err != nil + }, func() error { + now := time.Now() + fmt.Println(now.Sub(last).String()) + last = now + return fmt.Errorf("") + }) +}