diff --git a/TODO.MD b/TODO.MD index 366473cc..eca20002 100644 --- a/TODO.MD +++ b/TODO.MD @@ -1,3 +1,5 @@ # TODO -## 域名解析功能 \ No newline at end of file +## 域名解析功能 --完成 + +## 多个service inbound -- 完成 \ No newline at end of file diff --git a/pkg/main.go b/pkg/main.go index a50449cc..fb05f6f4 100644 --- a/pkg/main.go +++ b/pkg/main.go @@ -27,6 +27,7 @@ var ( baseCfg = &baseConfig{} kubeconfigpath string namespace string + services string clientset *kubernetes.Clientset restclient *rest.RESTClient config *rest.Config @@ -35,8 +36,9 @@ var ( func init() { flag.StringVar(&kubeconfigpath, "kubeconfig", clientcmd.RecommendedHomeFile, "kubeconfig") flag.StringVar(&namespace, "namespace", "", "namespace") + flag.StringVar(&services, "services", "", "services") flag.Parse() - + fmt.Printf("kubeconfig path: %s, namespace: %s, serivces: %s\n", kubeconfigpath, namespace, services) var err error configFlags := genericclioptions.NewConfigFlags(true).WithDeprecatedPasswordFlag() configFlags.KubeConfig = &kubeconfigpath @@ -68,10 +70,6 @@ func prepare() { IP: net.IPv4(192, 168, 254, 100), Mask: net.IPv4Mask(255, 255, 255, 0), } - err = remote.CreateServer(clientset, namespace, trafficManager.String()) - if err != nil { - log.Fatal(err) - } err = remote.InitDHCP(clientset, namespace, &net.IPNet{IP: net.IPv4(196, 168, 254, 100), Mask: net.IPv4Mask(255, 255, 255, 0)}) if err != nil { @@ -81,7 +79,20 @@ func prepare() { if err != nil { log.Fatal(err) } - remote.AddCleanUpResourceHandler(clientset, namespace, tunIp) + pod, err := remote.CreateServerOutbound(clientset, namespace, trafficManager.String()) + if err != nil { + log.Fatal(err) + } + tempIps := []*net.IPNet{tunIp} + for _, service := range strings.Split(services, ",") { + virtualShadowIp, _ := remote.GetRandomIpFromDHCP(clientset, namespace) + tempIps = append(tempIps, virtualShadowIp) + err = remote.CreateServerOutboundAndInbound(clientset, namespace, service, tunIp.IP.String(), pod.Status.PodIP, virtualShadowIp.String()) + if err != nil { + log.Error(err) + } + } + remote.AddCleanUpResourceHandler(clientset, namespace, services, tempIps...) //if runtime.GOOS == "windows" { tunIp.Mask = net.IPv4Mask(0, 0, 0, 0) //} else { @@ -115,7 +126,7 @@ func main() { readyChan := make(chan struct{}) stop := make(chan struct{}) go func() { - err := util.PortForwardPod(config, clientset, remote.TrafficManager, namespace, "10800:10800", readyChan, stop) + err := util.PortForwardPod(config, clientset, util.TrafficManager, namespace, "10800:10800", readyChan, stop) if err != nil { log.Error(err) } @@ -127,7 +138,7 @@ func main() { log.Fatal(err) } //time.Sleep(time.Second * 5) - dnsServiceIp := util.GetDNSServiceIpFromPod(clientset, restclient, config, remote.TrafficManager, namespace) + dnsServiceIp := util.GetDNSServiceIpFromPod(clientset, restclient, config, util.TrafficManager, namespace) if err := dns.DNS(dnsServiceIp); err != nil { log.Fatal(err) } diff --git a/remote/dhcp.go b/remote/dhcp.go index 3b828f11..5d5610d7 100644 --- a/remote/dhcp.go +++ b/remote/dhcp.go @@ -16,6 +16,7 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/util/retry" + "kubevpn/util" "net" "os" "os/signal" @@ -27,15 +28,20 @@ import ( var stopChan = make(chan os.Signal) -func AddCleanUpResourceHandler(client *kubernetes.Clientset, namespace string, ip *net.IPNet) { +func AddCleanUpResourceHandler(client *kubernetes.Clientset, namespace string, services string, ip ...*net.IPNet) { signal.Notify(stopChan, os.Interrupt, os.Kill, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL /*, syscall.SIGSTOP*/) go func() { <-stopChan log.Info("prepare to exit, cleaning up") //cleanUpTrafficManagerIfRefCountIsZero(client, namespace) - err := ReleaseIpToDHCP(client, namespace, ip) - if err != nil { - log.Errorf("failed to release ip to dhcp, err: %v", err) + for _, ipNet := range ip { + err := ReleaseIpToDHCP(client, namespace, ipNet) + if err != nil { + log.Errorf("failed to release ip to dhcp, err: %v", err) + } + } + for _, s := range strings.Split(services, ",") { + util.ScaleDeploymentReplicasTo(client, s, namespace, 1) } log.Info("clean up successful") os.Exit(0) @@ -43,12 +49,15 @@ func AddCleanUpResourceHandler(client *kubernetes.Clientset, namespace string, i } func deletePod(client *kubernetes.Clientset, podName, namespace string, wait bool) { - err := client.CoreV1().Pods(namespace).Delete(context.TODO(), podName, metav1.DeleteOptions{}) + zero := int64(0) + err := client.CoreV1().Pods(namespace).Delete(context.TODO(), podName, metav1.DeleteOptions{ + GracePeriodSeconds: &zero, + }) if !wait { return } if err != nil && errors.IsNotFound(err) { - log.Info("not found shadow pod, no need to delete it") + log.Infof("not found shadow pod: %s, no need to delete it", podName) return } log.Infof("waiting for pod: %s to be deleted...", podName) @@ -81,7 +90,7 @@ func updateRefCount(client *kubernetes.Clientset, namespace string, increment in retry.DefaultRetry, func(err error) bool { return err != nil }, func() error { - configMap, err := client.CoreV1().ConfigMaps(namespace).Get(context.TODO(), TrafficManager, metav1.GetOptions{}) + configMap, err := client.CoreV1().ConfigMaps(namespace).Get(context.TODO(), util.TrafficManager, metav1.GetOptions{}) if err != nil { log.Errorf("update ref-count failed, increment: %d, error: %v", increment, err) return err @@ -99,7 +108,7 @@ func updateRefCount(client *kubernetes.Clientset, namespace string, increment in }, }) _, err = client.CoreV1().ConfigMaps(namespace). - Patch(context.TODO(), TrafficManager, types.JSONPatchType, patch, metav1.PatchOptions{}) + Patch(context.TODO(), util.TrafficManager, types.JSONPatchType, patch, metav1.PatchOptions{}) return err }, ) @@ -112,7 +121,7 @@ func updateRefCount(client *kubernetes.Clientset, namespace string, increment in func cleanUpTrafficManagerIfRefCountIsZero(client *kubernetes.Clientset, namespace string) { updateRefCount(client, namespace, -1) - configMap, err := client.CoreV1().ConfigMaps(namespace).Get(context.TODO(), TrafficManager, metav1.GetOptions{}) + configMap, err := client.CoreV1().ConfigMaps(namespace).Get(context.TODO(), util.TrafficManager, metav1.GetOptions{}) if err != nil { log.Error(err) return @@ -125,13 +134,13 @@ func cleanUpTrafficManagerIfRefCountIsZero(client *kubernetes.Clientset, namespa // if refcount is less than zero or equals to zero, means no body will using this dns pod, so clean it if refCount <= 0 { log.Info("refCount is zero, prepare to clean up resource") - _ = client.CoreV1().ConfigMaps(namespace).Delete(context.TODO(), TrafficManager, metav1.DeleteOptions{}) - _ = client.CoreV1().Pods(namespace).Delete(context.TODO(), TrafficManager, metav1.DeleteOptions{}) + _ = client.CoreV1().ConfigMaps(namespace).Delete(context.TODO(), util.TrafficManager, metav1.DeleteOptions{}) + _ = client.CoreV1().Pods(namespace).Delete(context.TODO(), util.TrafficManager, metav1.DeleteOptions{}) } } func InitDHCP(client *kubernetes.Clientset, namespace string, addr *net.IPNet) error { - get, err := client.CoreV1().ConfigMaps(namespace).Get(context.Background(), TrafficManager, metav1.GetOptions{}) + get, err := client.CoreV1().ConfigMaps(namespace).Get(context.Background(), util.TrafficManager, metav1.GetOptions{}) if err == nil && get != nil { return nil } @@ -146,7 +155,7 @@ func InitDHCP(client *kubernetes.Clientset, namespace string, addr *net.IPNet) e } result := &v1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ - Name: TrafficManager, + Name: util.TrafficManager, Namespace: namespace, Labels: map[string]string{}, }, @@ -161,7 +170,7 @@ func InitDHCP(client *kubernetes.Clientset, namespace string, addr *net.IPNet) e } func GetIpFromDHCP(client *kubernetes.Clientset, namespace string) (*net.IPNet, error) { - get, err := client.CoreV1().ConfigMaps(namespace).Get(context.Background(), TrafficManager, metav1.GetOptions{}) + get, err := client.CoreV1().ConfigMaps(namespace).Get(context.Background(), util.TrafficManager, metav1.GetOptions{}) if err != nil { log.Errorf("failed to get ip from dhcp, err: %v", err) return nil, err @@ -183,6 +192,31 @@ func GetIpFromDHCP(client *kubernetes.Clientset, namespace string) (*net.IPNet, }, nil } +func GetRandomIpFromDHCP(client *kubernetes.Clientset, namespace string) (*net.IPNet, error) { + get, err := client.CoreV1().ConfigMaps(namespace).Get(context.Background(), util.TrafficManager, metav1.GetOptions{}) + if err != nil { + log.Errorf("failed to get ip from dhcp, err: %v", err) + return nil, err + } + split := strings.Split(get.Data["DHCP"], ",") + + ip := split[0] + split = split[1:] + + get.Data["DHCP"] = strings.Join(split, ",") + _, err = client.CoreV1().ConfigMaps(namespace).Update(context.Background(), get, metav1.UpdateOptions{}) + if err != nil { + log.Errorf("update dhcp error after get ip, need to put ip back, err: %v", err) + return nil, err + } + + atoi, _ := strconv.Atoi(ip) + return &net.IPNet{ + IP: net.IPv4(192, 168, 254, byte(atoi)), + Mask: net.IPv4Mask(255, 255, 255, 0), + }, nil +} + func getIp(availableIp []string) (int, []string) { var v uint32 interfaces, _ := net.Interfaces() @@ -250,7 +284,7 @@ func BytesToInt(b []byte) uint32 { } func ReleaseIpToDHCP(client *kubernetes.Clientset, namespace string, ip *net.IPNet) error { - get, err := client.CoreV1().ConfigMaps(namespace).Get(context.Background(), TrafficManager, metav1.GetOptions{}) + get, err := client.CoreV1().ConfigMaps(namespace).Get(context.Background(), util.TrafficManager, metav1.GetOptions{}) if err != nil { log.Errorf("failed to get dhcp, err: %v", err) return err diff --git a/remote/remote.go b/remote/remote.go index 7518aea6..2ffd544e 100644 --- a/remote/remote.go +++ b/remote/remote.go @@ -11,16 +11,15 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/kubectl/pkg/polymorphichelpers" "k8s.io/kubectl/pkg/util/podutils" + "kubevpn/util" "sort" "time" ) -const TrafficManager = "kubevpn.traffic.manager" - -func CreateServer(clientset *kubernetes.Clientset, namespace, ip string) error { +func CreateServerOutbound(clientset *kubernetes.Clientset, namespace, serverIp string) (*v1.Pod, error) { firstPod, i, err3 := polymorphichelpers.GetFirstPod(clientset.CoreV1(), namespace, - fields.OneTermEqualSelector("app", TrafficManager).String(), + fields.OneTermEqualSelector("app", util.TrafficManager).String(), time.Second*5, func(pods []*v1.Pod) sort.Interface { return sort.Reverse(podutils.ActivePods(pods)) @@ -28,17 +27,17 @@ func CreateServer(clientset *kubernetes.Clientset, namespace, ip string) error { ) if err3 == nil && i != 0 && firstPod != nil { - return nil + return firstPod, nil } t := true zero := int64(0) - name := TrafficManager + name := util.TrafficManager pod := v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: namespace, - Labels: map[string]string{"app": TrafficManager}, + Labels: map[string]string{"app": util.TrafficManager}, }, Spec: v1.PodSpec{ Containers: []v1.Container{ @@ -53,13 +52,21 @@ func CreateServer(clientset *kubernetes.Clientset, namespace, ip string) error { "iptables -P FORWARD ACCEPT;" + "iptables -t nat -A POSTROUTING -s 192.168.254.0/24 -o eth0 -j MASQUERADE;" + "iptables -t nat -A POSTROUTING -s 172.20.0.0/16 -o eth0 -j MASQUERADE;" + - "gost -L socks5://:10800 -L tun://:8421?net=" + ip + " -D", + "gost -L socks5://:10800 -L tun://:8421?net=" + serverIp + " -D", + }, + // todo get pod ip + Lifecycle: &v1.Lifecycle{ + PostStart: &v1.Handler{ + Exec: &v1.ExecAction{ + Command: []string{"env"}, + }, + }, }, SecurityContext: &v1.SecurityContext{ Capabilities: &v1.Capabilities{ Add: []v1.Capability{ "NET_ADMIN", - "SYS_MODULE", + //"SYS_MODULE", }, }, RunAsUser: &zero, @@ -75,7 +82,7 @@ func CreateServer(clientset *kubernetes.Clientset, namespace, ip string) error { v1.ResourceMemory: resource.MustParse("512Mi"), }, }, - ImagePullPolicy: v1.PullIfNotPresent, + ImagePullPolicy: v1.PullAlways, }, }, PriorityClassName: "system-cluster-critical", @@ -90,13 +97,90 @@ func CreateServer(clientset *kubernetes.Clientset, namespace, ip string) error { log.Fatal(err) } tick := time.Tick(time.Minute * 2) -out: for { select { case e := <-watch.ResultChan(): if e.Object.(*v1.Pod).Status.Phase == v1.PodRunning { watch.Stop() - break out + return e.Object.(*v1.Pod), nil + } + case <-tick: + watch.Stop() + log.Error("timeout") + return nil, errors.New("timeout") + } + } +} + +func CreateServerOutboundAndInbound(clientset *kubernetes.Clientset, namespace, service string, virtualLocalIp, realRouterIP, virtualShadowIp string) error { + lables := updateReplicasToZeroAndGetLabels(clientset, namespace, service) + newName := service + "-" + "shadow" + t := true + zero := int64(0) + pod := v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: newName, + Namespace: namespace, + Labels: lables, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "vpn", + Image: "naison/kubevpn:latest", + Command: []string{"/bin/sh", "-c"}, + Args: []string{ + "sysctl net.ipv4.ip_forward=1;" + + "iptables -F;" + + "iptables -P INPUT ACCEPT;" + + "iptables -P FORWARD ACCEPT;" + + "iptables -t nat -A PREROUTING -i eth0 -p tcp --dport 2000:60000 -j DNAT --to " + virtualLocalIp + ":2000-60000;" + + "iptables -t nat -A POSTROUTING -p tcp -m tcp --dport 2000:60000 -j MASQUERADE;" + + "iptables -t nat -A PREROUTING -i eth0 -p udp --dport 2000:60000 -j DNAT --to " + virtualLocalIp + ":2000-60000;" + + "iptables -t nat -A POSTROUTING -p udp -m udp --dport 2000:60000 -j MASQUERADE;" + + "gost -L 'tun://0.0.0.0:8421/127.0.0.1:8421?net=" + virtualShadowIp + "&route=172.20.0.0/16' -F 'socks5://" + realRouterIP + ":10800?notls=true'", + }, + SecurityContext: &v1.SecurityContext{ + Capabilities: &v1.Capabilities{ + Add: []v1.Capability{ + "NET_ADMIN", + //"SYS_MODULE", + }, + }, + RunAsUser: &zero, + Privileged: &t, + }, + Resources: v1.ResourceRequirements{ + Requests: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("128m"), + v1.ResourceMemory: resource.MustParse("128Mi"), + }, + Limits: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("256m"), + v1.ResourceMemory: resource.MustParse("256Mi"), + }, + }, + ImagePullPolicy: v1.PullAlways, + }, + }, + PriorityClassName: "system-cluster-critical", + }, + } + _, err2 := clientset.CoreV1().Pods(namespace).Create(context.TODO(), &pod, metav1.CreateOptions{}) + if err2 != nil { + log.Fatal(err2) + } + watch, err := clientset.CoreV1().Pods(namespace).Watch(context.TODO(), metav1.SingleObject(metav1.ObjectMeta{Name: newName})) + if err != nil { + log.Fatal(err) + } + tick := time.Tick(time.Minute * 2) + for { + select { + case e := <-watch.ResultChan(): + if e.Object.(*v1.Pod).Status.Phase == v1.PodRunning { + watch.Stop() + return nil } case <-tick: watch.Stop() @@ -104,5 +188,36 @@ out: return errors.New("timeout") } } - return nil +} + +func updateReplicasToZeroAndGetLabels(clientset *kubernetes.Clientset, namespace, service string) map[string]string { + if service == "" || namespace == "" { + log.Info("no need to expose local service to remote") + return nil + } + log.Info("prepare to expose local service to remote service: " + service) + util.ScaleDeploymentReplicasTo(clientset, service, namespace, 0) + labels := getLabels(clientset, namespace, service) + if labels == nil { + log.Info("fail to create shadow") + return nil + } + return labels +} +func getLabels(clientset *kubernetes.Clientset, namespace, service string) map[string]string { + get, err := clientset.CoreV1().Services(namespace). + Get(context.TODO(), service, metav1.GetOptions{}) + if err != nil { + log.Error(err) + return nil + } + selector := get.Spec.Selector + _, err = clientset.AppsV1().Deployments(namespace).Get(context.TODO(), service, metav1.GetOptions{}) + if err != nil { + log.Error(err) + return nil + } + newName := service + "-" + "shadow" + deletePod(clientset, newName, namespace, true) + return selector } diff --git a/remote/remote_test.go b/remote/remote_test.go index 011c7117..43fb389d 100644 --- a/remote/remote_test.go +++ b/remote/remote_test.go @@ -28,7 +28,7 @@ func TestCreateServer(t *testing.T) { log.Fatal(err) } - server := CreateServer(clientset, "test", "192.168.254.100/24") + server, err := CreateServerOutbound(clientset, "test", "192.168.254.100/24") fmt.Println(server) } diff --git a/util/const.go b/util/const.go new file mode 100644 index 00000000..ae26ce9b --- /dev/null +++ b/util/const.go @@ -0,0 +1,3 @@ +package util + +const TrafficManager = "kubevpn.traffic.manager" diff --git a/util/util.go b/util/util.go index 6e5aabb8..83836be1 100644 --- a/util/util.go +++ b/util/util.go @@ -26,7 +26,6 @@ import ( "k8s.io/client-go/transport/spdy" "k8s.io/client-go/util/retry" "k8s.io/kubectl/pkg/cmd/exec" - "kubevpn/remote" "net" "net/http" "os" @@ -229,7 +228,7 @@ func GetDNSServiceIpFromPod(clientset *kubernetes.Clientset, restclient *rest.RE if ip, err := GetDNSIp(clientset); err == nil && len(ip) != 0 { return ip } - if ip, err := Shell(clientset, restclient, config, remote.TrafficManager, namespace, "cat /etc/resolv.conf | grep nameserver | awk '{print$2}'"); err == nil && len(ip) != 0 { + if ip, err := Shell(clientset, restclient, config, TrafficManager, namespace, "cat /etc/resolv.conf | grep nameserver | awk '{print$2}'"); err == nil && len(ip) != 0 { return ip } log.Fatal("this should not happened") diff --git a/util/util_test.go b/util/util_test.go index 3cf6c9a6..577d8282 100644 --- a/util/util_test.go +++ b/util/util_test.go @@ -11,7 +11,6 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "k8s.io/kubectl/pkg/cmd/util" - "kubevpn/remote" "testing" ) @@ -42,7 +41,7 @@ func TestShell(t *testing.T) { log.Fatal(err) } - out, err := Shell(clientset, restclient, config, remote.TrafficManager, namespace, "cat /etc/resolv.conf | grep nameserver | awk '{print$2}'") + out, err := Shell(clientset, restclient, config, TrafficManager, namespace, "cat /etc/resolv.conf | grep nameserver | awk '{print$2}'") serviceList, err := clientset.CoreV1().Services(v1.NamespaceSystem).List(context.Background(), v1.ListOptions{ FieldSelector: fields.OneTermEqualSelector("metadata.name", "kube-dns").String(), })