实现code-server的连接创建逻辑

This commit is contained in:
lwch
2022-07-22 18:24:50 +08:00
parent 5526cffdd3
commit d4295efdab
8 changed files with 284 additions and 15 deletions

View File

@@ -2,6 +2,7 @@ package app
import (
"os"
"path/filepath"
rt "runtime"
"github.com/kardianos/service"
@@ -42,11 +43,21 @@ func (a *App) Stop(s service.Service) error {
return nil
}
func (a *App) clean() {
files, err := filepath.Glob(filepath.Join(a.cfg.TmpDir, "*"))
runtime.Assert(err)
for _, file := range files {
os.RemoveAll(file)
}
}
func (a *App) run() {
// go func() {
// http.ListenAndServe(":9000", nil)
// }()
a.clean()
stdout := true
if rt.GOOS == "windows" {
stdout = false
@@ -79,7 +90,7 @@ func (a *App) run() {
mgr.Add(b)
go b.Handle(a.conn)
case "code-server":
cs := code.New(t)
cs := code.New(t, a.cfg.ReadTimeout, a.cfg.WriteTimeout)
mgr.Add(cs)
go cs.Handle(a.conn)
}
@@ -98,6 +109,8 @@ func (a *App) run() {
a.vncCreate(a.confDir, mgr, a.conn, msg)
case network.ConnectRequest_bench:
a.benchCreate(a.confDir, mgr, a.conn, msg)
case network.ConnectRequest_code:
a.codeCreate(a.confDir, mgr, a.conn, msg)
}
default:
linkID = msg.GetLinkId()

View File

@@ -5,6 +5,7 @@ import (
"github.com/lwch/natpass/code/client/conn"
"github.com/lwch/natpass/code/client/global"
"github.com/lwch/natpass/code/client/rule"
"github.com/lwch/natpass/code/client/rule/code"
"github.com/lwch/natpass/code/client/rule/shell"
"github.com/lwch/natpass/code/client/rule/vnc"
"github.com/lwch/natpass/code/network"
@@ -71,3 +72,28 @@ func (a *App) benchCreate(confDir string, mgr *rule.Mgr, conn *conn.Conn, msg *n
msg.GetFrom(), a.cfg.ID)
conn.SendConnectOK(msg.GetFrom(), msg.GetLinkId())
}
func (a *App) codeCreate(confDir string, mgr *rule.Mgr, conn *conn.Conn, msg *network.Msg) {
create := msg.GetCreq()
tn := mgr.GetLinked(create.GetName(), msg.GetFrom())
if tn == nil {
tn = code.New(global.Rule{
Name: create.GetName(),
Target: msg.GetFrom(),
Type: "code-server",
}, a.cfg.ReadTimeout, a.cfg.WriteTimeout)
mgr.Add(tn.(rule.Rule))
}
workspace := tn.NewLink(msg.GetLinkId(), msg.GetFrom(), nil, conn).(*code.Workspace)
logging.Info("create link %s for code-server rule [%s] from %s to %s",
msg.GetLinkId(), create.GetName(),
msg.GetFrom(), a.cfg.ID)
err := workspace.Exec(a.cfg.TmpDir)
if err != nil {
logging.Error("create vnc failed: %v", err)
conn.SendConnectError(msg.GetFrom(), msg.GetLinkId(), err.Error())
return
}
conn.SendConnectOK(msg.GetFrom(), msg.GetLinkId())
workspace.Forward()
}

View File

@@ -53,6 +53,13 @@ func (conn *Conn) SendConnectReq(id string, cfg global.Rule) {
XType: network.ConnectRequest_bench,
},
}
case "code-server":
msg.Payload = &network.Msg_Creq{
Creq: &network.ConnectRequest{
Name: cfg.Name,
XType: network.ConnectRequest_code,
},
}
}
select {
case conn.write <- &msg:

View File

@@ -2,25 +2,36 @@ package code
import (
"fmt"
"net"
"net/http"
"sync"
"time"
"github.com/lwch/logging"
"github.com/lwch/natpass/code/client/conn"
"github.com/lwch/natpass/code/client/global"
"github.com/lwch/natpass/code/client/rule"
"github.com/lwch/runtime"
)
// Code code-server handler
type Code struct {
Name string
cfg global.Rule
sync.RWMutex
Name string
cfg global.Rule
workspace map[string]*Workspace
readTimeout time.Duration
writeTimeout time.Duration
}
// New new code-server handler
func New(cfg global.Rule) *Code {
func New(cfg global.Rule, readTimeout, writeTimeout time.Duration) *Code {
return &Code{
Name: cfg.Name,
cfg: cfg,
Name: cfg.Name,
cfg: cfg,
workspace: make(map[string]*Workspace),
readTimeout: readTimeout,
writeTimeout: writeTimeout,
}
}
@@ -39,6 +50,37 @@ func (code *Code) GetPort() uint16 {
return code.cfg.LocalPort
}
// GetTarget get target of this rule
func (code *Code) GetTarget() string {
return code.cfg.Target
}
// GetLinks get rule links
func (code *Code) GetLinks() []rule.Link {
ret := make([]rule.Link, 0, len(code.workspace))
code.RLock()
for _, link := range code.workspace {
ret = append(ret, link)
}
code.RUnlock()
return ret
}
// GetRemote get remote target name
func (code *Code) GetRemote() string {
return code.cfg.Target
}
// NewLink new link
func (code *Code) NewLink(id, remote string, localConn net.Conn, remoteConn *conn.Conn) rule.Link {
remoteConn.AddLink(id)
ws := newWorkspace(code, id, code.cfg.Name, remote, remoteConn)
code.Lock()
code.workspace[ws.id] = ws
code.Unlock()
return ws
}
// Handle handle code-server
func (code *Code) Handle(c *conn.Conn) {
defer func() {
@@ -52,6 +94,7 @@ func (code *Code) Handle(c *conn.Conn) {
}
}
mux := http.NewServeMux()
mux.HandleFunc("/new", pf(code.New))
mux.HandleFunc("/", pf(code.Forward))
svr := &http.Server{
Addr: fmt.Sprintf("%s:%d", code.cfg.LocalAddr, code.cfg.LocalPort),

View File

@@ -0,0 +1,56 @@
package code
import (
"fmt"
"net/http"
"time"
"github.com/lwch/logging"
"github.com/lwch/natpass/code/client/conn"
"github.com/lwch/natpass/code/network"
"github.com/lwch/runtime"
)
// New new code-server workspace
func (code *Code) New(conn *conn.Conn, w http.ResponseWriter, r *http.Request) {
id, err := runtime.UUID(16, "0123456789abcdef")
if err != nil {
logging.Error("failed to generate link_id for code-server: %s, err=%v",
code.Name, err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
link := code.NewLink(id, code.cfg.Target, nil, conn).(*Workspace)
conn.SendConnectReq(id, code.cfg)
ch := conn.ChanRead(id)
timeout := time.After(code.readTimeout)
var repMsg *network.Msg
for {
var msg *network.Msg
select {
case msg = <-ch:
case <-timeout:
logging.Error("create code-server %s by rule %s failed, timtout", link.id, link.parent.Name)
http.Error(w, "timeout", http.StatusBadGateway)
return
}
if msg.GetXType() != network.Msg_connect_rep {
conn.Reset(id, msg)
time.Sleep(code.readTimeout / 10)
continue
}
rep := msg.GetCrep()
if !rep.GetOk() {
logging.Error("create code-server %s by rule %s failed, err=%s",
link.id, link.parent.Name, rep.GetMsg())
http.Error(w, rep.GetMsg(), http.StatusBadGateway)
return
}
repMsg = msg
break
}
logging.Info("create link %s for code-server rule [%s] from %s to %s",
link.GetID(), code.cfg.Name,
repMsg.GetTo(), repMsg.GetFrom())
fmt.Fprint(w, id)
}

View File

@@ -0,0 +1,120 @@
package code
import (
"bufio"
"io"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"github.com/lwch/logging"
"github.com/lwch/natpass/code/client/conn"
)
// Workspace workspace of code-server
type Workspace struct {
parent *Code
id string
target string
name string
exec *exec.Cmd
remote *conn.Conn
// runtime
sendBytes uint64
recvBytes uint64
sendPacket uint64
recvPacket uint64
}
func newWorkspace(parent *Code, id, name, target string, remote *conn.Conn) *Workspace {
name = strings.ReplaceAll(name, "/", "_")
name = strings.ReplaceAll(name, "\\", "_")
return &Workspace{
parent: parent,
id: id,
target: target,
name: name,
remote: remote,
}
}
// GetID get link id
func (ws *Workspace) GetID() string {
return ws.id
}
// GetBytes get send and recv bytes
func (ws *Workspace) GetBytes() (uint64, uint64) {
return ws.recvBytes, ws.sendBytes
}
// GetPackets get send and recv packets
func (ws *Workspace) GetPackets() (uint64, uint64) {
return ws.recvPacket, ws.sendPacket
}
// Exec execute code-server
func (ws *Workspace) Exec(dir string) error {
workdir := filepath.Join(dir, ws.name)
err := os.MkdirAll(workdir, 0755)
if err != nil {
logging.Error("can not create work dir[%s]: %v", workdir, err)
return err
}
ws.exec = exec.Command("code-server", "--disable-update-check",
"--auth", "none",
"--socket", filepath.Join(workdir, ws.id+".sock"),
"--user-data-dir", filepath.Join(workdir, "data"),
"--extensions-dir", filepath.Join(workdir, "extensions"))
stdout, err := ws.exec.StdoutPipe()
if err != nil {
logging.Error("can not get stdout pipe for link [%s] name [%s]", ws.id, ws.name)
return err
}
stderr, err := ws.exec.StderrPipe()
if err != nil {
logging.Error("can not get stderr pipe for link [%s] name [%s]", ws.id, ws.name)
return err
}
err = ws.exec.Start()
if err != nil {
logging.Error("can not start code-server for link [%s] name [%s]", ws.id, ws.name)
return err
}
go ws.log(stdout, stderr)
return nil
}
// Close close workspace
func (ws *Workspace) Close() {
if ws.exec != nil && ws.exec.Process != nil {
ws.exec.Process.Kill()
}
ws.remote.SendDisconnect(ws.target, ws.id)
}
func (ws *Workspace) log(stdout, stderr io.ReadCloser) {
defer stdout.Close()
defer stderr.Close()
var wg sync.WaitGroup
wg.Add(2)
watch := func(target io.Reader) {
defer wg.Done()
s := bufio.NewScanner(target)
for s.Scan() {
logging.Info("code-server [%s] [%s]: %s", ws.id, ws.name, s.Text())
}
}
go watch(stdout)
go watch(stderr)
wg.Wait()
}
func (ws *Workspace) Forward() {
}

View File

@@ -28,6 +28,7 @@ const (
ConnectRequest_shell ConnectRequestType = 2 // shell
ConnectRequest_vnc ConnectRequestType = 3 // vnc
ConnectRequest_bench ConnectRequestType = 4 // benchmark
ConnectRequest_code ConnectRequestType = 5 // code-server
)
// Enum value maps for ConnectRequestType.
@@ -38,6 +39,7 @@ var (
2: "shell",
3: "vnc",
4: "bench",
5: "code",
}
ConnectRequestType_value = map[string]int32{
"tcp": 0,
@@ -45,6 +47,7 @@ var (
"shell": 2,
"vnc": 3,
"bench": 4,
"code": 5,
}
)
@@ -429,7 +432,7 @@ var file_connect_proto_rawDesc = []byte{
0x01, 0x28, 0x0d, 0x52, 0x03, 0x66, 0x70, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x71, 0x75, 0x61, 0x6c,
0x69, 0x74, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x07, 0x71, 0x75, 0x61, 0x6c, 0x69,
0x74, 0x79, 0x12, 0x16, 0x0a, 0x06, 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x18, 0x03, 0x20, 0x01,
0x28, 0x08, 0x52, 0x06, 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x22, 0xaa, 0x02, 0x0a, 0x0f, 0x63,
0x28, 0x08, 0x52, 0x06, 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x22, 0xb4, 0x02, 0x0a, 0x0f, 0x63,
0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x5f, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12,
0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61,
0x6d, 0x65, 0x12, 0x32, 0x0a, 0x05, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28,
@@ -444,16 +447,16 @@ var file_connect_proto_rawDesc = []byte{
0x06, 0x63, 0x73, 0x68, 0x65, 0x6c, 0x6c, 0x12, 0x2a, 0x0a, 0x04, 0x63, 0x76, 0x6e, 0x63, 0x18,
0x0c, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x6e, 0x65, 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x2e,
0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x5f, 0x76, 0x6e, 0x63, 0x48, 0x00, 0x52, 0x04, 0x63,
0x76, 0x6e, 0x63, 0x22, 0x37, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x07, 0x0a, 0x03, 0x74,
0x76, 0x6e, 0x63, 0x22, 0x41, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x07, 0x0a, 0x03, 0x74,
0x63, 0x70, 0x10, 0x00, 0x12, 0x07, 0x0a, 0x03, 0x75, 0x64, 0x70, 0x10, 0x01, 0x12, 0x09, 0x0a,
0x05, 0x73, 0x68, 0x65, 0x6c, 0x6c, 0x10, 0x02, 0x12, 0x07, 0x0a, 0x03, 0x76, 0x6e, 0x63, 0x10,
0x03, 0x12, 0x09, 0x0a, 0x05, 0x62, 0x65, 0x6e, 0x63, 0x68, 0x10, 0x04, 0x42, 0x09, 0x0a, 0x07,
0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x34, 0x0a, 0x10, 0x63, 0x6f, 0x6e, 0x6e, 0x65,
0x63, 0x74, 0x5f, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x6f,
0x6b, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x02, 0x6f, 0x6b, 0x12, 0x10, 0x0a, 0x03, 0x6d,
0x73, 0x67, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6d, 0x73, 0x67, 0x42, 0x0c, 0x5a,
0x0a, 0x2e, 0x2f, 0x3b, 0x6e, 0x65, 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x62, 0x06, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x33,
0x03, 0x12, 0x09, 0x0a, 0x05, 0x62, 0x65, 0x6e, 0x63, 0x68, 0x10, 0x04, 0x12, 0x08, 0x0a, 0x04,
0x63, 0x6f, 0x64, 0x65, 0x10, 0x05, 0x42, 0x09, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61,
0x64, 0x22, 0x34, 0x0a, 0x10, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x5f, 0x72, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x6f, 0x6b, 0x18, 0x01, 0x20, 0x01, 0x28,
0x08, 0x52, 0x02, 0x6f, 0x6b, 0x12, 0x10, 0x0a, 0x03, 0x6d, 0x73, 0x67, 0x18, 0x02, 0x20, 0x01,
0x28, 0x09, 0x52, 0x03, 0x6d, 0x73, 0x67, 0x42, 0x0c, 0x5a, 0x0a, 0x2e, 0x2f, 0x3b, 0x6e, 0x65,
0x74, 0x77, 0x6f, 0x72, 0x6b, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (

View File

@@ -26,6 +26,7 @@ message connect_request {
shell = 2; // shell
vnc = 3; // vnc
bench = 4; // benchmark
code = 5; // code-server
}
string name = 1; // rule name
type _type = 2; // rule type