diff --git a/cmd/connect.go b/cmd/connect.go index 034236c1..05c1dc09 100644 --- a/cmd/connect.go +++ b/cmd/connect.go @@ -29,6 +29,9 @@ var connectCmd = &cobra.Command{ Long: `connect`, PreRun: func(*cobra.Command, []string) { util.InitLogger(util.Debug) + if util.IsWindows() { + driver.InstallWireGuardTunDriver() + } }, Run: func(cmd *cobra.Command, args []string) { connect.InitClient() diff --git a/cmd/serve.go b/cmd/serve.go index c04c85bd..9a1dcd5d 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -24,9 +24,12 @@ var ServerCmd = &cobra.Command{ util.InitLogger(util.Debug) }, Run: func(cmd *cobra.Command, args []string) { - if err := pkg.Start(config); err != nil { + c, err := pkg.Start(config) + if err != nil { + log.Fatal(err) + } + if err := <-c; err != nil { log.Fatal(err) } - select {} }, } diff --git a/pkg/connect.go b/pkg/connect.go index 840eeec5..5cace7e4 100644 --- a/pkg/connect.go +++ b/pkg/connect.go @@ -6,7 +6,6 @@ import ( "fmt" log "github.com/sirupsen/logrus" "github.com/wencaiwulue/kubevpn/dns" - "github.com/wencaiwulue/kubevpn/driver" "github.com/wencaiwulue/kubevpn/remote" "github.com/wencaiwulue/kubevpn/util" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -39,35 +38,27 @@ type ConnectOptions struct { restclient *rest.RESTClient config *rest.Config factory cmdutil.Factory + cidrs []*net.IPNet + routerIP string + dhcp *remote.DHCPManager } -func (c *ConnectOptions) createRemotePod() { - k8sCIDRs, err := getCIDR(c.clientset, c.Namespace) - if err != nil { - log.Fatal(err) - } +var trafficManager = net.IPNet{ + IP: net.IPv4(223, 254, 254, 100), + Mask: net.CIDRMask(24, 32), +} + +func (c *ConnectOptions) createRemoteInboundPod() { var list []string - for _, ipNet := range k8sCIDRs { + for _, ipNet := range c.cidrs { list = append(list, ipNet.String()) } - trafficManager := net.IPNet{ - IP: net.IPv4(223, 254, 254, 100), - Mask: net.CIDRMask(24, 32), + tunIp, err := c.dhcp.RentIPBaseNICAddress() + if err != nil { + log.Fatal(err) } - manager := remote.NewDHCPManager(c.clientset, c.Namespace, &trafficManager) - if err = manager.InitDHCP(); err != nil { - log.Fatal(err) - } - tunIp, err := manager.RentIPBaseNICAddress() - if err != nil { - log.Fatal(err) - } - pod, err := CreateServerOutbound(c.clientset, c.Namespace, &trafficManager, k8sCIDRs) - if err != nil { - log.Fatal(err) - } tempIps := []*net.IPNet{tunIp} wg := sync.WaitGroup{} lock := sync.Mutex{} @@ -77,7 +68,7 @@ func (c *ConnectOptions) createRemotePod() { go func(finalWorkload string) { defer wg.Done() lock.Lock() - virtualShadowIp, _ := manager.RentIPRandom() + virtualShadowIp, _ := c.dhcp.RentIPRandom() tempIps = append(tempIps, virtualShadowIp) lock.Unlock() @@ -89,18 +80,18 @@ func (c *ConnectOptions) createRemotePod() { c.Namespace, finalWorkload, tunIp.IP.String(), - pod.Status.PodIP, + c.routerIP, virtualShadowIp.String(), strings.Join(list, ","), ) } else { - err = CreateServerInbound( + err = CreateInboundPod( c.factory, c.clientset, c.Namespace, finalWorkload, tunIp.IP.String(), - pod.Status.PodIP, + c.routerIP, virtualShadowIp.String(), strings.Join(list, ","), ) @@ -112,7 +103,7 @@ func (c *ConnectOptions) createRemotePod() { } } wg.Wait() - remote.AddCleanUpResourceHandler(c.clientset, c.Namespace, c.Workloads, manager, tempIps...) + remote.AddCleanUpResourceHandler(c.clientset, c.Namespace, c.Workloads, c.dhcp, tempIps...) if util.IsWindows() { tunIp.Mask = net.CIDRMask(0, 32) } else { @@ -125,14 +116,44 @@ func (c *ConnectOptions) createRemotePod() { c.nodeConfig.ServeNodes = []string{fmt.Sprintf("tun://:8421/127.0.0.1:8421?net=%s&route=%s", tunIp.String(), strings.Join(list, ","))} log.Info("your ip is " + tunIp.String()) - - if util.IsWindows() { - driver.InstallWireGuardTunDriver() - } } func (c *ConnectOptions) DoConnect() { - c.createRemotePod() + var err error + c.cidrs, err = getCIDR(c.clientset, c.Namespace) + if err != nil { + log.Fatal(err) + } + c.routerIP, err = CreateOutboundRouterPod(c.clientset, c.Namespace, &trafficManager, c.cidrs) + if err != nil { + log.Fatal(err) + } + c.dhcp = remote.NewDHCPManager(c.clientset, c.Namespace, &trafficManager) + if err = c.dhcp.InitDHCP(); err != nil { + log.Fatal(err) + } + c.createRemoteInboundPod() + c.portForward() + c.startLocalTunServe() +} + +func (c ConnectOptions) heartbeats() { + go func() { + tick := time.Tick(time.Second * 15) + c2 := make(chan struct{}, 1) + c2 <- struct{}{} + for { + select { + case <-tick: + c2 <- struct{}{} + case <-c2: + _ = exec.Command("ping", "-c", "4", "223.254.254.100").Run() + } + } + }() +} + +func (c *ConnectOptions) portForward() { var readyChanRef *chan struct{} ctx, cancelFunc := context.WithCancel(context.Background()) remote.CancelFunctions = append(remote.CancelFunctions, cancelFunc) @@ -169,8 +190,11 @@ func (c *ConnectOptions) DoConnect() { } <-*readyChanRef log.Info("port forward ready") +} - if err := Start(c.nodeConfig); err != nil { +func (c *ConnectOptions) startLocalTunServe() { + errChan, err := Start(c.nodeConfig) + if err != nil { log.Fatal(err) } @@ -180,21 +204,14 @@ func (c *ConnectOptions) DoConnect() { } util.DeleteWindowsFirewallRule() } + c.heartbeats() + c.setupDNS() log.Info("dns service ok") - go func() { - tick := time.Tick(time.Second * 15) - c2 := make(chan struct{}, 1) - c2 <- struct{}{} - for { - select { - case <-tick: - c2 <- struct{}{} - case <-c2: - _ = exec.Command("ping", "-c", "4", "223.254.254.100").Run() - } - } - }() + // wait for exit + <-errChan +} +func (c ConnectOptions) setupDNS() { relovConf, err := dns.GetDNSServiceIPFromPod(c.clientset, c.restclient, c.config, util.TrafficManager, c.Namespace) if err != nil { log.Fatal(err) @@ -202,31 +219,30 @@ func (c *ConnectOptions) DoConnect() { if err = dns.SetupDNS(relovConf); err != nil { log.Fatal(err) } - // wait for exit - <-ctx.Done() } -func Start(r Route) error { +func Start(r Route) (chan error, error) { routers, err := r.GenRouters() if err != nil { - return err + return nil, err } if len(routers) == 0 { - return errors.New("invalid config") + return nil, errors.New("invalid config") } - + c := make(chan error, len(routers)) for i := range routers { ctx, cancelFunc := context.WithCancel(context.Background()) remote.CancelFunctions = append(remote.CancelFunctions, cancelFunc) - go func(finalCtx context.Context, finalI int) { + go func(finalCtx context.Context, finalI int, c chan error) { if err = routers[finalI].Serve(finalCtx); err != nil { log.Warn(err) + c <- err } - }(ctx, i) + }(ctx, i, c) } - return nil + return c, nil } func getCIDR(clientset *kubernetes.Clientset, namespace string) ([]*net.IPNet, error) { diff --git a/pkg/remote.go b/pkg/remote.go index db999972..274b02ac 100644 --- a/pkg/remote.go +++ b/pkg/remote.go @@ -20,7 +20,7 @@ import ( "time" ) -func CreateServerOutbound(clientset *kubernetes.Clientset, namespace string, serverIp *net.IPNet, nodeCIDR []*net.IPNet) (*v1.Pod, error) { +func CreateOutboundRouterPod(clientset *kubernetes.Clientset, namespace string, serverIp *net.IPNet, nodeCIDR []*net.IPNet) (string, error) { firstPod, i, err3 := polymorphichelpers.GetFirstPod(clientset.CoreV1(), namespace, fields.OneTermEqualSelector("app", util.TrafficManager).String(), @@ -32,7 +32,7 @@ func CreateServerOutbound(clientset *kubernetes.Clientset, namespace string, ser if err3 == nil && i != 0 && firstPod != nil { remote.UpdateRefCount(clientset, namespace, firstPod.Name, 1) - return firstPod, nil + return firstPod.Status.PodIP, nil } args := []string{ "sysctl net.ipv4.ip_forward=1", @@ -104,17 +104,17 @@ func CreateServerOutbound(clientset *kubernetes.Clientset, namespace string, ser case e := <-watch.ResultChan(): if e.Object.(*v1.Pod).Status.Phase == v1.PodRunning { watch.Stop() - return e.Object.(*v1.Pod), nil + return e.Object.(*v1.Pod).Status.PodIP, nil } case <-tick: watch.Stop() log.Error("timeout") - return nil, errors.New("timeout") + return "", errors.New("timeout") } } } -func CreateServerInbound(factory cmdutil.Factory, clientset *kubernetes.Clientset, namespace, workloads, virtualLocalIp, realRouterIP, virtualShadowIp, routes string) error { +func CreateInboundPod(factory cmdutil.Factory, clientset *kubernetes.Clientset, namespace, workloads, virtualLocalIp, realRouterIP, virtualShadowIp, routes string) error { resourceTuple, parsed, err2 := util.SplitResourceTypeName(workloads) if !parsed || err2 != nil { return errors.New("not need") diff --git a/remote/remote_test.go b/remote/remote_test.go index 8395a84a..910c882b 100644 --- a/remote/remote_test.go +++ b/remote/remote_test.go @@ -48,7 +48,7 @@ func TestCreateServer(t *testing.T) { Mask: net.IPv4Mask(255, 255, 0, 0), } - server, err := pkg.CreateServerOutbound(clientset, "test", i, []*net.IPNet{j}) + server, err := pkg.CreateOutboundRouterPod(clientset, "test", i, []*net.IPNet{j}) fmt.Println(server) }