refoctor: ssh logic (#194)

* feat: optimize ssh
This commit is contained in:
naison
2024-03-08 19:16:29 +08:00
committed by GitHub
parent f3d1c99a04
commit 600e35b8d7
4 changed files with 150 additions and 34 deletions

View File

@@ -14,8 +14,7 @@ import (
)
// CmdSSHDaemon
// 设置本地的IP是223.254.0.1/32 ,记得一定是掩码 32位,
// 这样别的路由不会走到这里来
// set local tun ip 223.254.0.1/32, remember to use mask 32
func CmdSSHDaemon(_ cmdutil.Factory) *cobra.Command {
var clientIP string
cmd := &cobra.Command{
@@ -41,8 +40,8 @@ func CmdSSHDaemon(_ cmdutil.Factory) *cobra.Command {
if err != nil {
return err
}
fmt.Fprint(os.Stdout, client.ServerIP)
return nil
_, err = fmt.Fprint(os.Stdout, client.ServerIP)
return err
},
}
cmd.Flags().StringVar(&clientIP, "client-ip", "", "Client cidr")

View File

@@ -2,6 +2,7 @@ package handler
import (
"bufio"
"bytes"
"context"
"encoding/json"
"errors"
@@ -21,6 +22,7 @@ import (
"golang.org/x/net/websocket"
"golang.org/x/oauth2"
"k8s.io/client-go/tools/remotecommand"
"sigs.k8s.io/yaml"
"github.com/wencaiwulue/kubevpn/v2/pkg/config"
"github.com/wencaiwulue/kubevpn/v2/pkg/core"
@@ -87,9 +89,6 @@ func (w *wsHandler) handle(ctx context.Context) {
w.Log("Port map error: %v", err)
return
}
// startup daemon process if daemon process not start
startDaemonCmd := fmt.Sprintf(`export %s=%s && kubevpn get service > /dev/null 2>&1 &`, config.EnvStartSudoKubeVPNByKubeVPN, "true")
util.RemoteRun(cli, startDaemonCmd, nil)
cmd := fmt.Sprintf(`export %s=%s && kubevpn ssh-daemon --client-ip %s`, config.EnvStartSudoKubeVPNByKubeVPN, "true", clientIP.String())
serverIP, stderr, err := util.RemoteRun(cli, cmd, nil)
if err != nil {
@@ -101,7 +100,7 @@ func (w *wsHandler) handle(ctx context.Context) {
}
ip, _, err := net.ParseCIDR(string(serverIP))
if err != nil {
w.Log("Parse server ip error: %v", err)
w.Log("Parse server ip %s, stderr: %s: %v", string(serverIP), string(stderr), err)
return
}
msg := fmt.Sprintf("| You can use client: %s to communicate with server: %s |", clientIP.IP.String(), ip.String())
@@ -151,6 +150,47 @@ func (w *wsHandler) handle(ctx context.Context) {
return
}
// startup daemon process if daemon process not start
func startDaemonProcess(cli *ssh.Client) {
startDaemonCmd := fmt.Sprintf(`export %s=%s && kubevpn get service > /dev/null 2>&1 &`, config.EnvStartSudoKubeVPNByKubeVPN, "true")
_, _, _ = util.RemoteRun(cli, startDaemonCmd, nil)
ticker := time.NewTicker(time.Millisecond * 50)
defer ticker.Stop()
for range ticker.C {
output, _, err := util.RemoteRun(cli, "kubevpn version", nil)
if err != nil {
continue
}
version := getDaemonVersionFromOutput(output)
if version != "" && version != "unknown" {
break
}
}
}
func getDaemonVersionFromOutput(output []byte) (version string) {
type Data struct {
DaemonVersion string `json:"DaemonVersion"`
}
// remove first line
buf := bufio.NewReader(bytes.NewReader(output))
_, _, _ = buf.ReadLine()
restBytes, err := io.ReadAll(buf)
if err != nil {
return
}
jsonBytes, err := yaml.YAMLToJSON(restBytes)
if err != nil {
return
}
var data Data
err = json.Unmarshal(jsonBytes, &data)
if err != nil {
return
}
return data.DaemonVersion
}
func (w *wsHandler) terminal(ctx context.Context, cli *ssh.Client, conn *websocket.Conn) error {
session, err := cli.NewSession()
if err != nil {
@@ -187,11 +227,17 @@ func (w *wsHandler) terminal(ctx context.Context, cli *ssh.Client, conn *websock
return session.Wait()
}
func (w *wsHandler) installKubevpnOnRemote(ctx context.Context, sshClient *ssh.Client) error {
cmd := `hash kubevpn || type kubevpn || which kubevpn || command -v kubevpn`
_, _, err := util.RemoteRun(sshClient, cmd, nil)
func (w *wsHandler) installKubevpnOnRemote(ctx context.Context, sshClient *ssh.Client) (err error) {
defer func() {
if err == nil {
w.Log("Remote kubevpn command found, use it")
startDaemonProcess(sshClient)
}
}()
cmd := "kubevpn version"
_, _, err = util.RemoteRun(sshClient, cmd, nil)
if err == nil {
w.Log("Found command kubevpn command on remote")
return nil
}
log.Infof("remote kubevpn command not found, try to install it...")
@@ -249,12 +295,7 @@ func (w *wsHandler) installKubevpnOnRemote(ctx context.Context, sshClient *ssh.C
"sudo mv ~/.kubevpn/kubevpn /usr/local/bin/kubevpn",
}
err = util.SCPAndExec(w.conn, w.conn, sshClient, tempBin.Name(), "kubevpn", cmds...)
if err != nil {
return err
}
// try to startup daemon process
go util.RemoteRun(sshClient, "kubevpn get pods", nil)
return nil
}
func (w *wsHandler) Log(format string, a ...any) {

View File

@@ -0,0 +1,49 @@
package handler
import "testing"
func TestGetVersionFromOutput(t *testing.T) {
tests := []struct {
output string
version string
}{
{
output: `KubeVPN: CLI
Version: v2.2.3
DaemonVersion: v2.2.3
Image: docker.io/naison/kubevpn:v2.2.3
Branch: feat/ssh-heartbeat
Git commit: 1272e86a337d3075427ee3a1c3681d378558d133
Built time: 2024-03-08 17:14:49
Built OS/Arch: darwin/arm64
Built Go version: go1.20.5`,
version: "v2.2.3",
},
{
output: `KubeVPN: CLI
Version: v2.2.3
DaemonVersion: unknown
Image: docker.io/naison/kubevpn:v2.2.3
Branch: feat/ssh-heartbeat
Git commit: 1272e86a337d3075427ee3a1c3681d378558d133
Built time: 2024-03-08 17:14:49
Built OS/Arch: darwin/arm64
Built Go version: go1.20.5`,
version: "unknown",
},
{
output: "hello",
version: "",
},
{
output: "",
version: "",
},
}
for _, test := range tests {
version := getDaemonVersionFromOutput([]byte(test.output))
if version != test.version {
t.Failed()
}
}
}

View File

@@ -80,9 +80,15 @@ func (s *SshConfig) ToRPC() *rpc.SshJump {
}
// DialSshRemote https://github.com/golang/go/issues/21478
func DialSshRemote(conf *SshConfig) (*ssh.Client, error) {
var remote *ssh.Client
var err error
func DialSshRemote(conf *SshConfig) (remote *ssh.Client, err error) {
defer func() {
if err != nil {
if remote != nil {
remote.Close()
}
}
}()
if conf.ConfigAlias != "" {
remote, err = jumpRecursion(conf.ConfigAlias)
} else {
@@ -141,7 +147,7 @@ func DialSshRemote(conf *SshConfig) (*ssh.Client, error) {
// ref: https://github.com/golang/go/issues/21478
if err == nil {
go func() {
ticker := time.NewTicker(time.Second * 5)
ticker := time.NewTicker(time.Second * 15)
defer ticker.Stop()
defer remote.Close()
for range ticker.C {
@@ -237,6 +243,13 @@ func copyStream(local net.Conn, remote net.Conn) {
}
func jumpRecursion(name string) (client *ssh.Client, err error) {
defer func() {
if err != nil {
if client != nil {
client.Close()
}
}
}()
var jumper = "ProxyJump"
var bastionList = []*SshConfig{getBastion(name)}
for {
@@ -309,18 +322,32 @@ func dial(from *SshConfig) (*ssh.Client, error) {
})
}
func jump(bClient *ssh.Client, to *SshConfig) (*ssh.Client, error) {
func jump(bClient *ssh.Client, to *SshConfig) (client *ssh.Client, err error) {
var authMethod ssh.AuthMethod
authMethod, err = publicKeyFile(to.Keyfile)
if err != nil {
return
}
// Dial a connection to the service host, from the bastion
conn, err := bClient.Dial("tcp", to.Addr)
var conn net.Conn
conn, err = bClient.Dial("tcp", to.Addr)
if err != nil {
return nil, err
return
}
authMethod, err := publicKeyFile(to.Keyfile)
defer func() {
if err != nil {
return nil, err
if client != nil {
client.Close()
}
ncc, chans, reqs, err := ssh.NewClientConn(conn, to.Addr, &ssh.ClientConfig{
if conn != nil {
conn.Close()
}
}
}()
var ncc ssh.Conn
var chans <-chan ssh.NewChannel
var reqs <-chan *ssh.Request
ncc, chans, reqs, err = ssh.NewClientConn(conn, to.Addr, &ssh.ClientConfig{
User: to.User,
Auth: []ssh.AuthMethod{authMethod},
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
@@ -328,11 +355,11 @@ func jump(bClient *ssh.Client, to *SshConfig) (*ssh.Client, error) {
Timeout: time.Second * 10,
})
if err != nil {
return nil, err
return
}
sClient := ssh.NewClient(ncc, chans, reqs)
return sClient, nil
client = ssh.NewClient(ncc, chans, reqs)
return
}
type conf []*ssh_config.Config