feat: guacd

This commit is contained in:
ttk
2024-02-23 17:55:01 +08:00
parent 502048c165
commit 15357ab6b4
4 changed files with 173 additions and 149 deletions

View File

@@ -65,36 +65,15 @@ func (c *Controller) Connecting(ctx *gin.Context) {
defer ws.Close()
defer func() {
if err == nil {
return
}
logger.L.Debug("connecting failed", zap.String("session_id", sessionId), zap.Error(err))
ae, ok := err.(*ApiError)
if !ok {
return
}
lang := ctx.PostForm("lang")
accept := ctx.GetHeader("Accept-Language")
localizer := i18n.NewLocalizer(conf.Bundle, lang, accept)
ws.WriteMessage(websocket.TextMessage, []byte(ae.Message(localizer)))
handleError(ctx, sessionId, err, ws)
}()
v, ok := onlineSession.Load(sessionId)
if !ok {
err = &ApiError{Code: ErrInvalidSessionId, Data: map[string]any{"sessionId": sessionId}}
return
}
session, ok := v.(*model.Session)
if !ok {
err = &ApiError{Code: ErrLoadSession, Data: map[string]any{"err": "invalid type"}}
return
}
if session.Connected.Load() {
err = &ApiError{Code: ErrInvalidSessionId, Data: map[string]any{"sessionId": sessionId}}
session, err := loadOnlineSessionById(sessionId)
if err != nil {
return
}
session.Connected.CompareAndSwap(false, true)
if strings.HasPrefix(session.Protocol, "ssh") {
if session.IsSsh() {
err = handleSsh(ctx, ws, session)
} else {
err = handleGuacd(ctx, ws, session)
@@ -108,23 +87,24 @@ func handleSsh(ctx *gin.Context, ws *websocket.Conn, session *model.Session) (er
}()
chs.WindowChan <- fmt.Sprintf("%s,%s,%s", ctx.Query("w"), ctx.Query("h"), ctx.Query("dpi"))
tk, tk1s := time.NewTicker(time.Millisecond*100), time.NewTicker(time.Second)
g := &errgroup.Group{}
g, gctx := errgroup.WithContext(ctx)
g.Go(func() error {
return readWsMsg(ctx, ws, chs)
return readWsMsg(gctx, ws, chs)
})
g.Go(func() error {
for {
select {
case <-ctx.Done():
case <-gctx.Done():
return nil
case closeBy := <-chs.CloseChan:
out := []byte("\r\n \033[31m closed by admin")
ws.WriteMessage(websocket.TextMessage, out)
writeToMonitors(session.Monitors, out)
logger.L.Warn("close by admin", zap.String("username", closeBy))
return nil
err := fmt.Errorf("colse by admin %s", closeBy)
logger.L.Warn(err.Error())
return err
case err := <-chs.ErrChan:
logger.L.Error("disconnected", zap.Error(err))
logger.L.Error("server disconnected", zap.Error(err))
return err
case in := <-chs.InChan:
rt := in[0]
@@ -164,14 +144,13 @@ func handleGuacd(ctx *gin.Context, ws *websocket.Conn, session *model.Session) (
g.Go(func() error {
for {
select {
case <-ctx.Done():
return nil
case closeBy := <-chs.CloseChan:
out := []byte("\r\n \033[31m closed by admin")
ws.WriteMessage(websocket.TextMessage, out)
writeToMonitors(session.Monitors, out)
logger.L.Warn("close by admin", zap.String("username", closeBy))
return nil
err := fmt.Errorf("colse by admin %s", closeBy)
logger.L.Warn(err.Error())
return err
case err := <-chs.ErrChan:
logger.L.Error("disconnected", zap.Error(err))
return err
@@ -208,16 +187,14 @@ func sendMsg(ws *websocket.Conn, session *model.Session, chs *model.SessionChans
// @Success 200 {object} HttpResponse{data=model.Session}
// @Router /connect/:asset_id/:account_id/:protocol [post]
func (c *Controller) Connect(ctx *gin.Context) {
w, h, dpi, protocol, chs := cast.ToInt(ctx.Query("w")), cast.ToInt(ctx.Query("h")), cast.ToInt(ctx.Query("dpi")), ctx.Param("protocol"), makeChans()
protocol, chs := ctx.Param("protocol"), makeChans()
sessionId, resp := "", &model.ServerResp{}
switch strings.Split(protocol, ":")[0] {
case "ssh":
go doSsh(ctx, w, h, newSshReq(ctx, model.SESSIONACTION_NEW), chs)
go doSsh(ctx, newSshReq(ctx, model.SESSIONACTION_NEW), chs)
case "vnc", "rdp":
w, h, dpi = cast.ToInt(ctx.Query("screen_width")), cast.ToInt(ctx.Query("screen_height")), cast.ToInt(ctx.Query("screen_dpi"))
w, h, dpi = 731, 929, 128
go doGuacd(ctx, "", w, h, dpi, protocol, chs)
go doGuacd(ctx, "", protocol, chs)
default:
logger.L.Error("wrong protocol " + protocol)
}
@@ -253,7 +230,7 @@ func readWsMsg(ctx context.Context, ws *websocket.Conn, chs *model.SessionChans)
for {
select {
case <-ctx.Done():
return fmt.Errorf("ctx done")
return nil
default:
t, msg, err := ws.ReadMessage()
if err != nil {
@@ -271,8 +248,8 @@ func readWsMsg(ctx context.Context, ws *websocket.Conn, chs *model.SessionChans)
}
}
func doSsh(ctx *gin.Context, w, h int, req *model.SshReq, chs *model.SessionChans) {
var err error
func doSsh(ctx *gin.Context, req *model.SshReq, chs *model.SessionChans) (err error) {
w, h := cast.ToInt(ctx.Query("w")), cast.ToInt(ctx.Query("h"))
defer func() {
chs.ErrChan <- err
}()
@@ -343,34 +320,38 @@ func doSsh(ctx *gin.Context, w, h int, req *model.SshReq, chs *model.SessionChan
chs.ErrChan <- nil
chs.RespChan <- resp
waitChan := make(chan error)
g := errgroup.Group{}
g, gctx := errgroup.WithContext(context.Background())
g.Go(func() error {
waitChan <- sess.Wait()
return nil
// TODO
return sess.Wait()
})
g.Go(func() error {
for {
rn, size, err := buf.ReadRune()
if err != nil {
logger.L.Debug("buf ReadRune failed", zap.Error(err))
return err
select {
case <-gctx.Done():
return nil
default:
rn, size, err := buf.ReadRune()
if err != nil {
logger.L.Debug("buf ReadRune failed", zap.Error(err))
return err
}
if size <= 0 || rn == utf8.RuneError {
continue
}
p := make([]byte, utf8.RuneLen(rn))
utf8.EncodeRune(p, rn)
chs.OutChan <- p
}
if size <= 0 || rn == utf8.RuneError {
continue
}
p := make([]byte, utf8.RuneLen(rn))
utf8.EncodeRune(p, rn)
chs.OutChan <- p
}
})
g.Go(func() error {
for {
select {
case err = <-waitChan:
return err
case <-gctx.Done():
return nil
case <-chs.AwayChan:
return fmt.Errorf("away")
return nil
case s := <-chs.WindowChan:
wh := strings.Split(s, ",")
if len(wh) < 2 {
@@ -390,6 +371,8 @@ func doSsh(ctx *gin.Context, w, h int, req *model.SshReq, chs *model.SessionChan
if err = g.Wait(); err != nil {
logger.L.Warn("doSsh stopped", zap.Error(err))
}
return
}
func makeChans() *model.SessionChans {
@@ -424,7 +407,9 @@ func newSshReq(ctx *gin.Context, action int) *model.SshReq {
}
}
func doGuacd(ctx *gin.Context, connection string, w, h, dpi int, protocol string, chs *model.SessionChans) {
func doGuacd(ctx *gin.Context, connection string, protocol string, chs *model.SessionChans) {
w, h, dpi := cast.ToInt(ctx.Query("w")), cast.ToInt(ctx.Query("h")), cast.ToInt(ctx.Query("dpi"))
w, h, dpi = 731, 929, 128 //TODO
currentUser, _ := acl.GetSessionFromCtx(ctx)
var err error
@@ -458,19 +443,24 @@ func doGuacd(ctx *gin.Context, connection string, w, h, dpi int, protocol string
logger.L.Error("guacd tunnel failed", zap.Error(err))
return
}
if err = t.Handshake(); err != nil {
logger.L.Error("guacd handshake failed", zap.Error(err))
return
}
session := newGuacdSession(ctx, t.ConnectionId, asset, account, gateway)
session := newGuacdSession(ctx, t.SessionId, asset, account, gateway)
if err = handleUpsertSession(ctx, session); err != nil {
return
}
defer func() {
session.Status = model.SESSIONSTATUS_OFFLINE
session.ClosedAt = lo.ToPtr(time.Now())
if err = handleUpsertSession(ctx, session); err != nil {
logger.L.Error("offline guacd session failed", zap.Error(err))
return
}
}()
resp := &model.ServerResp{
Code: lo.Ternary(err == nil, 0, -1),
Message: lo.TernaryF(err == nil, func() string { return "" }, func() string { return err.Error() }),
SessionId: t.ConnectionId,
SessionId: t.SessionId,
Uid: currentUser.GetUid(),
UserName: currentUser.GetUserName(),
}
@@ -478,30 +468,32 @@ func doGuacd(ctx *gin.Context, connection string, w, h, dpi int, protocol string
chs.ErrChan <- nil
chs.RespChan <- resp
defer func() {
session.Status = model.SESSIONSTATUS_OFFLINE
session.ClosedAt = lo.ToPtr(time.Now())
}()
g := &errgroup.Group{}
g, gctx := errgroup.WithContext(context.Background())
g.Go(func() error {
for {
p, err := t.Read()
if err != nil {
logger.L.Debug("read instruction failed", zap.Error(err))
return err
select {
case <-gctx.Done():
return nil
default:
p, err := t.Read()
if err != nil {
logger.L.Debug("read instruction failed", zap.Error(err))
return err
}
if len(p) <= 0 || bytes.HasPrefix(p, guacd.InternalOpcodeIns) {
continue
}
chs.OutChan <- p
}
if len(p) <= 0 || bytes.HasPrefix(p, guacd.InternalOpcodeIns) {
continue
}
chs.OutChan <- p
}
})
g.Go(func() error {
for {
select {
case <-gctx.Done():
return nil
case <-chs.AwayChan:
return fmt.Errorf("away")
return nil
case in := <-chs.InChan:
if !bytes.HasPrefix(in, guacd.InternalOpcodeIns) {
t.Write(in)
@@ -562,19 +554,7 @@ func (c *Controller) ConnectMonitor(ctx *gin.Context) {
defer ws.Close()
defer func() {
if err == nil {
return
}
logger.L.Debug("monitor failed", zap.String("session_id", sessionId), zap.Error(err))
ae, ok := err.(*ApiError)
if !ok {
return
}
lang := ctx.PostForm("lang")
accept := ctx.GetHeader("Accept-Language")
localizer := i18n.NewLocalizer(conf.Bundle, lang, accept)
ws.WriteMessage(websocket.TextMessage, []byte(ae.Message(localizer)))
ctx.AbortWithError(http.StatusBadRequest, err)
handleError(ctx, sessionId, err, ws)
}()
if !acl.IsAdmin(currentUser) {
@@ -582,44 +562,27 @@ func (c *Controller) ConnectMonitor(ctx *gin.Context) {
return
}
session := &model.Session{}
err = mysql.DB.
Where("session_id = ?", sessionId).
Where("status = ?", model.SESSIONSTATUS_ONLINE).
First(session).
Error
session, err := loadOnlineSessionById(sessionId)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
onlineSession.Delete(sessionId)
}
ctx.AbortWithError(http.StatusBadRequest, &ApiError{Code: ErrInvalidSessionId, Data: map[string]any{"sessionId": sessionId}})
return
}
v, ok := onlineSession.Load(sessionId)
if !ok {
ctx.AbortWithError(http.StatusBadRequest, &ApiError{Code: ErrInvalidSessionId, Data: map[string]any{"sessionId": sessionId}})
return
}
session, ok = v.(*model.Session)
if !ok {
ctx.AbortWithError(http.StatusBadRequest, &ApiError{Code: ErrInvalidSessionId, Data: map[string]any{"sessionId": sessionId}})
return
}
g := &errgroup.Group{}
switch session.SessionType {
case model.SESSIONTYPE_WEB:
if !session.IsSsh() {
//TODO
}
case model.SESSIONTYPE_CLIENT:
cur := false
session.Monitors.Range(func(key, value any) bool {
cur = true
return !cur
})
if !cur {
// clinet only has ssh type
if !session.HasMonitors() {
req := newSshReq(ctx, model.SESSIONACTION_MONITOR)
req.SessionId = sessionId
chs := makeChans()
session.Chans = chs
logger.L.Debug("connect to monitor client", zap.String("sessionId", sessionId))
go doSsh(ctx, cast.ToInt(ctx.Query("w")), cast.ToInt(ctx.Query("h")), req, chs)
go doSsh(ctx, req, chs)
if err = <-chs.ErrChan; err != nil {
err = &ApiError{Code: ErrConnectServer, Data: map[string]any{"err": err}}
return
@@ -630,33 +593,34 @@ func (c *Controller) ConnectMonitor(ctx *gin.Context) {
return
}
tk := time.NewTicker(time.Millisecond * 100)
defer sendMsg(nil, session, chs)
go func() {
g.Go(func() error {
for {
select {
case <-ctx.Done():
return
case closeBy := <-chs.CloseChan:
writeToMonitors(session.Monitors, []byte("\r\n \033[31m closed by admin"))
logger.L.Warn("close by admin", zap.String("username", closeBy))
return
return nil
case err := <-chs.ErrChan:
logger.L.Error("ssh connection failed", zap.Error(err))
return
return err
case out := <-chs.OutChan:
chs.Buf.Write(out)
case <-tk.C:
sendMsg(nil, session, chs)
}
}
}()
})
}
}
session.Monitors.Store(key, ws)
defer func() {
session.Monitors.Delete(key)
if session.IsSsh() && !session.HasMonitors() {
close(session.Chans.AwayChan)
}
}()
for {
_, _, err = ws.ReadMessage()
if err != nil {
@@ -695,11 +659,11 @@ func (c *Controller) ConnectClose(ctx *gin.Context) {
}
logger.L.Info("closing...", zap.String("sessionId", session.SessionId), zap.Int("type", session.SessionType))
defer doOfflineOnlineSession(ctx, session.SessionId, currentUser.GetUserName())
defer offlineSession(ctx, session.SessionId, currentUser.GetUserName())
chs := makeChans()
req := newSshReq(ctx, model.SESSIONACTION_CLOSE)
req.SessionId = session.SessionId
go doSsh(ctx, cast.ToInt(ctx.Query("w")), cast.ToInt(ctx.Query("h")), req, chs)
go doSsh(ctx, req, chs)
if err = <-chs.ErrChan; err != nil {
ctx.AbortWithError(http.StatusInternalServerError, &ApiError{Code: ErrConnectServer, Data: map[string]any{"err": err}})
return
@@ -713,7 +677,7 @@ func (c *Controller) ConnectClose(ctx *gin.Context) {
ctx.JSON(http.StatusOK, defaultHttpResponse)
}
func doOfflineOnlineSession(ctx *gin.Context, sessionId string, closer string) {
func offlineSession(ctx *gin.Context, sessionId string, closer string) {
logger.L.Debug("offline", zap.String("session_id", sessionId), zap.String("closer", closer))
defer onlineSession.Delete(sessionId)
v, ok := onlineSession.Load(sessionId)
@@ -795,7 +759,6 @@ func (c *Controller) TestConnect(ctx *gin.Context) {
},
})
ctx.Params = append(ctx.Params, gin.Param{Key: "asset_id", Value: "1"}, gin.Param{Key: "account_id", Value: "1"}, gin.Param{Key: "protocol", Value: "rdp:13389"})
fmt.Println("----------ing", ctx.Query("screen_width"), ctx.Query("screen_height"), ctx.Query("screen_dpi"))
c.Connect(ctx)
}
@@ -821,6 +784,40 @@ func (c *Controller) TestConnecting(ctx *gin.Context) {
NickName: "",
},
})
fmt.Println("----------", ctx.Query("screen_width"), ctx.Query("screen_height"), ctx.Query("screen_dpi"))
c.Connecting(ctx)
}
func loadOnlineSessionById(sessionId string) (session *model.Session, err error) {
v, ok := onlineSession.Load(sessionId)
if !ok {
err = &ApiError{Code: ErrInvalidSessionId, Data: map[string]any{"sessionId": sessionId}}
return
}
session, ok = v.(*model.Session)
if !ok {
err = &ApiError{Code: ErrLoadSession, Data: map[string]any{"err": "invalid type"}}
return
}
if session.Connected.Load() {
err = &ApiError{Code: ErrInvalidSessionId, Data: map[string]any{"sessionId": sessionId}}
return
}
return
}
func handleError(ctx *gin.Context, sessionId string, err error, ws *websocket.Conn) {
if err == nil {
return
}
logger.L.Debug("monitor failed", zap.String("session_id", sessionId), zap.Error(err))
ae, ok := err.(*ApiError)
if !ok {
return
}
lang := ctx.PostForm("lang")
accept := ctx.GetHeader("Accept-Language")
localizer := i18n.NewLocalizer(conf.Bundle, lang, accept)
ws.WriteMessage(websocket.TextMessage, []byte(ae.Message(localizer)))
ctx.AbortWithError(http.StatusBadRequest, err)
}

View File

@@ -71,7 +71,7 @@ func Init() (err error) {
ctx := &gin.Context{}
for _, s := range sessions {
if s.SessionType == model.SESSIONTYPE_WEB {
doOfflineOnlineSession(ctx, s.SessionId, "")
offlineSession(ctx, s.SessionId, "")
continue
}
s.Monitors = &sync.Map{}

View File

@@ -7,6 +7,7 @@ import (
"strings"
"time"
"github.com/google/uuid"
"github.com/samber/lo"
"github.com/spf13/cast"
"github.com/veops/oneterm/pkg/conf"
@@ -14,7 +15,11 @@ import (
"github.com/veops/oneterm/pkg/util"
)
const Version = "VERSION_1_5_0"
const (
recordingPath = "/playback"
createRecording = "true"
ignoreCert = "true"
)
type Configuration struct {
Protocol string
@@ -28,10 +33,11 @@ func NewConfiguration() (config *Configuration) {
}
type Tunnel struct {
SessionId string
ConnectionId string
conn net.Conn
reader *bufio.Reader
writer *bufio.Writer
ConnectionId string
Config *Configuration
}
@@ -50,27 +56,33 @@ func NewTunnel(connectionId string, w, h, dpi int, protocol string, asset *model
Config: &Configuration{
Protocol: protocol,
Parameters: map[string]string{
"width": cast.ToString(w),
"height": cast.ToString(h),
"dpi": cast.ToString(dpi),
"scheme": protocol,
"hostname": asset.Ip,
"port": port,
"ignore-cert": "true",
"security": "",
"username": account.Account,
"password": util.DecryptAES(account.Password),
"recording-path": recordingPath,
"create-recording-path": createRecording,
"ignore-cert": ignoreCert,
"width": cast.ToString(w),
"height": cast.ToString(h),
"dpi": cast.ToString(dpi),
"scheme": protocol,
"hostname": asset.Ip,
"port": port,
"username": account.Account,
"password": util.DecryptAES(account.Password),
},
},
}
if t.ConnectionId == "" {
t.SessionId = uuid.New().String()
}
err = t.handshake()
return
}
// Handshake
// handshake
//
// https://guacamole.apache.org/doc/gug/guacamole-protocol.html#handshake-phase
func (t *Tunnel) Handshake() (err error) {
func (t *Tunnel) handshake() (err error) {
defer func() {
if err != nil {
t.conn.Close()

View File

@@ -3,6 +3,7 @@ package model
import (
"bytes"
"io"
"strings"
"sync"
"sync/atomic"
"time"
@@ -56,6 +57,20 @@ func (m *Session) TableName() string {
return "session"
}
func (m *Session) IsSsh() bool {
return strings.HasPrefix(m.Protocol, "ssh")
}
func (m *Session) HasMonitors() (has bool) {
m.Monitors.Range(func(key, value any) bool {
has = true
return false
})
return
}
type SessionCmd struct {
Id int `json:"id" gorm:"column:id;primarykey"`
SessionId string `json:"session_id" gorm:"column:session_id"`