diff --git a/dns/dns_unix.go b/dns/dns_unix.go index 02e591f2..da594572 100644 --- a/dns/dns_unix.go +++ b/dns/dns_unix.go @@ -9,21 +9,20 @@ import ( v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "kubevpn/remote" + "kubevpn/util" "os" "path/filepath" ) -func Dns(clientset *kubernetes.Clientset) error { - var dnsIP string +func Dns(ip string) error { var err error - if dnsIP, err = GetDNSIp(clientset); err != nil { - return err - } if err = os.MkdirAll(filepath.Join("/", "etc", "resolver"), fs.ModePerm); err != nil { log.Error(err) } filename := filepath.Join("/", "etc", "resolver", "local") - fileContent := "nameserver " + dnsIP + fileContent := "nameserver " + ip return ioutil.WriteFile(filename, []byte(fileContent), fs.ModePerm) } @@ -39,3 +38,14 @@ func GetDNSIp(clientset *kubernetes.Clientset) (string, error) { } return serviceList.Items[0].Spec.ClusterIP, nil } + +func GetDNSServiceIpFromPod(clientset *kubernetes.Clientset, restclient *rest.RESTClient, config *rest.Config, podName, namespace string) string { + //if ip, err := GetDNSIp(clientset); err == nil && len(ip) != 0 { + // return ip + //} + if ip, err := util.Shell(clientset, restclient, config, remote.TrafficManager, namespace, "cat /etc/resolv.conf | grep nameserver | awk '{print$2}'"); err == nil && len(ip) != 0 { + return ip + } + log.Fatal("this should not happened") + return "" +} diff --git a/pkg/main.go b/pkg/main.go index 907e98ff..2d25c419 100644 --- a/pkg/main.go +++ b/pkg/main.go @@ -6,44 +6,47 @@ import ( "fmt" log "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/client-go/kubernetes" - restclient "k8s.io/client-go/rest" + "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" - "k8s.io/client-go/util/homedir" + cmdutil "k8s.io/kubectl/pkg/cmd/util" "kubevpn/dns" "kubevpn/exe" "kubevpn/remote" + "kubevpn/util" "net" - "path/filepath" "runtime" "strings" ) var ( - baseCfg = &baseConfig{} - namespace string - clientset *kubernetes.Clientset - config *restclient.Config - name string + baseCfg = &baseConfig{} + namespace string + clientset *kubernetes.Clientset + restclient *rest.RESTClient + config *rest.Config + name string ) func init() { var err error - clientConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig( - &clientcmd.ClientConfigLoadingRules{ - ExplicitPath: filepath.Join(homedir.HomeDir(), clientcmd.RecommendedHomeDir, clientcmd.RecommendedFileName), - }, - nil, - ) - config, err = clientConfig.ClientConfig() - if err != nil { + configFlags := genericclioptions.NewConfigFlags(true).WithDeprecatedPasswordFlag() + configFlags.KubeConfig = &clientcmd.RecommendedHomeFile + f := cmdutil.NewFactory(cmdutil.NewMatchVersionFlags(configFlags)) + + if config, err = f.ToRESTConfig(); err != nil { log.Fatal(err) } - clientset, err = kubernetes.NewForConfig(config) - if err != nil { + if restclient, err = rest.RESTClientFor(config); err != nil { + log.Fatal(err) + } + if clientset, err = kubernetes.NewForConfig(config); err != nil { + log.Fatal(err) + } + if namespace, _, err = f.ToRawKubeConfigLoader().Namespace(); err != nil { log.Fatal(err) } - namespace, _, _ = clientConfig.Namespace() k8sCIDR, err := getCIDR(clientset, namespace) if err != nil { @@ -80,7 +83,7 @@ func main() { readyChan := make(chan struct{}) stop := make(chan struct{}) go func() { - err := PortForwardPod(config, + err := util.PortForwardPod(config, clientset, name, namespace, @@ -94,8 +97,8 @@ func main() { }() <-readyChan log.Info("port forward ready") - - if err := dns.Dns(clientset); err != nil { + dnsServiceIp := dns.GetDNSServiceIpFromPod(clientset, restclient, config, remote.TrafficManager, namespace) + if err := dns.Dns(dnsServiceIp); err != nil { log.Fatal(err) } else { log.Info("dns service ok") diff --git a/pkg/util.go b/util/util.go similarity index 60% rename from pkg/util.go rename to util/util.go index da3345ec..0f5e0a61 100644 --- a/pkg/util.go +++ b/util/util.go @@ -1,14 +1,15 @@ -package main +package util import ( + "bytes" "context" "fmt" - "github.com/moby/term" + dockerterm "github.com/moby/term" log "github.com/sirupsen/logrus" + "io" v12 "k8s.io/api/autoscaling/v1" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" "k8s.io/cli-runtime/pkg/genericclioptions" @@ -22,11 +23,11 @@ import ( clientgowatch "k8s.io/client-go/tools/watch" "k8s.io/client-go/transport/spdy" "k8s.io/client-go/util/retry" - "k8s.io/kubectl/pkg/cmd/util" - term2 "k8s.io/kubectl/pkg/util/term" + "k8s.io/kubectl/pkg/cmd/exec" "net" "net/http" "os" + "strings" "time" ) @@ -152,95 +153,58 @@ func ScaleDeploymentReplicasTo(options *kubernetes.Clientset, name, namespace st } } -type shellOptions interface { - GetNamespace() string - GetDeployment() string - GetLocalDir() string - GetRemoteDir() string - GetKubeconfig() string -} +func Shell(clientset *kubernetes.Clientset, restclient *rest.RESTClient, config *rest.Config, podName, namespace, cmd string) (string, error) { + pod, err := clientset.CoreV1().Pods(namespace).Get(context.Background(), podName, metav1.GetOptions{}) -func Shell(client *kubernetes.Clientset, options shellOptions) error { - deployment, err2 := client.AppsV1().Deployments(options.GetNamespace()). - Get(context.TODO(), options.GetDeployment(), metav1.GetOptions{}) - if err2 != nil { - log.Error(err2) - } - labelMap, _ := metav1.LabelSelectorAsMap(deployment.Spec.Selector) - pods, err := client.CoreV1().Pods(options.GetNamespace()). - List(context.TODO(), metav1.ListOptions{LabelSelector: labels.SelectorFromSet(labelMap).String()}) if err != nil { - log.Errorf("get kubedev pod error: %v", err) + return "", err } - if len(pods.Items) <= 0 { - log.Warnf("this should not happened, pods items length: %d", len(pods.Items)) + if pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed { + err = fmt.Errorf("cannot exec into a container in a completed pod; current phase is %s", pod.Status.Phase) + return "", err } - index := -1 - for i, pod := range pods.Items { - if pod.Status.Phase == v1.PodRunning { - index = i - break - } + containerName := pod.Spec.Containers[0].Name + stdin, stdout, stderr := dockerterm.StdStreams() + + stdoutBuf := bytes.NewBuffer(nil) + stdout = io.MultiWriter(stdout, stdoutBuf) + StreamOptions := exec.StreamOptions{ + Namespace: namespace, + PodName: podName, + ContainerName: containerName, + IOStreams: genericclioptions.IOStreams{In: stdin, Out: stdout, ErrOut: stderr}, } - if index < 0 { - return fmt.Errorf("cannot exec into a container in a completed pod; current phase is %s", pods.Items[0].Status.Phase) - } - stdin, stdout, stderr := term.StdStreams() - tty := term2.TTY{ - Out: stdout, - In: stdin, - Raw: true, - } - if !tty.IsTerminalIn() { - log.Error("Unable to use a TTY - input is not a terminal or the right kind of file") - } - var terminalSizeQueue remotecommand.TerminalSizeQueue - if tty.Raw { - terminalSizeQueue = tty.MonitorSize(tty.GetSize()) - } - f := func() error { - configFlags := genericclioptions.NewConfigFlags(true).WithDeprecatedPasswordFlag() - kubeconfig := options.GetKubeconfig() - configFlags.KubeConfig = &kubeconfig - namespace := options.GetNamespace() - configFlags.Namespace = &namespace - f := util.NewFactory(util.NewMatchVersionFlags(configFlags)) - config, _ := f.ToRESTConfig() - restClient, err := rest.RESTClientFor(config) - if err != nil { - return err - } - req := restClient.Post(). - Resource("pods"). - Name(pods.Items[index].Name). - Namespace(options.GetNamespace()). - SubResource("exec"). - VersionedParams( - &v1.PodExecOptions{ - Container: pods.Items[index].Spec.Containers[0].Name, - Command: []string{"sh", "-c", "(bash||sh)"}, - Stdin: true, - Stdout: true, - Stderr: true, - TTY: true, - }, - scheme.ParameterCodec, - ) - executor, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL()) - if err != nil { - return err - } - return executor.Stream(remotecommand.StreamOptions{ - Stdin: tty.In, - Stdout: tty.Out, - Stderr: stderr, - Tty: true, - TerminalSizeQueue: terminalSizeQueue, - }) + Executor := &exec.DefaultRemoteExecutor{} + // ensure we can recover the terminal while attached + tt := StreamOptions.SetupTTY() + + var sizeQueue remotecommand.TerminalSizeQueue + if tt.Raw { + // this call spawns a goroutine to monitor/update the terminal size + sizeQueue = tt.MonitorSize(tt.GetSize()) + + // unset p.Err if it was previously set because both stdout and stderr go over p.Out when tty is + // true + StreamOptions.ErrOut = nil } - if err = tty.Safe(f); err != nil { - return err + fn := func() error { + req := restclient.Post(). + Resource("pods"). + Name(pod.Name). + Namespace(pod.Namespace). + SubResource("exec") + req.VersionedParams(&v1.PodExecOptions{ + Container: containerName, + Command: []string{"sh", "-c", cmd}, + Stdin: StreamOptions.Stdin, + Stdout: StreamOptions.Out != nil, + Stderr: StreamOptions.ErrOut != nil, + TTY: tt.Raw, + }, scheme.ParameterCodec) + return Executor.Execute("POST", req.URL(), config, StreamOptions.In, StreamOptions.Out, StreamOptions.ErrOut, tt.Raw, sizeQueue) } - return nil + + err = tt.Safe(fn) + return strings.TrimRight(stdoutBuf.String(), "\n"), err } diff --git a/util/util_test.go b/util/util_test.go new file mode 100644 index 00000000..3cf6c9a6 --- /dev/null +++ b/util/util_test.go @@ -0,0 +1,51 @@ +package util + +import ( + "context" + "fmt" + log "github.com/sirupsen/logrus" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/cli-runtime/pkg/genericclioptions" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/kubectl/pkg/cmd/util" + "kubevpn/remote" + "testing" +) + +var ( + namespace string + clientset *kubernetes.Clientset + restclient *rest.RESTClient + config *rest.Config +) + +func TestShell(t *testing.T) { + var err error + + configFlags := genericclioptions.NewConfigFlags(true).WithDeprecatedPasswordFlag() + configFlags.KubeConfig = &clientcmd.RecommendedHomeFile + f := util.NewFactory(util.NewMatchVersionFlags(configFlags)) + + if config, err = f.ToRESTConfig(); err != nil { + log.Fatal(err) + } + if restclient, err = rest.RESTClientFor(config); err != nil { + log.Fatal(err) + } + if clientset, err = kubernetes.NewForConfig(config); err != nil { + log.Fatal(err) + } + if namespace, _, err = f.ToRawKubeConfigLoader().Namespace(); err != nil { + log.Fatal(err) + } + + out, err := Shell(clientset, restclient, config, remote.TrafficManager, namespace, "cat /etc/resolv.conf | grep nameserver | awk '{print$2}'") + serviceList, err := clientset.CoreV1().Services(v1.NamespaceSystem).List(context.Background(), v1.ListOptions{ + FieldSelector: fields.OneTermEqualSelector("metadata.name", "kube-dns").String(), + }) + + fmt.Println(out == serviceList.Items[0].Spec.ClusterIP) +}