mirror of
https://github.com/veops/oneterm.git
synced 2025-10-25 08:32:40 +08:00
perf(backend): error tips
This commit is contained in:
@@ -9,6 +9,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/gliderlabs/ssh"
|
||||
"github.com/google/uuid"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/samber/lo"
|
||||
@@ -27,6 +28,10 @@ import (
|
||||
"github.com/veops/oneterm/pkg/logger"
|
||||
)
|
||||
|
||||
var (
|
||||
byteClearAll = []byte("\x15\r")
|
||||
)
|
||||
|
||||
func Connect(ctx *gin.Context) {
|
||||
ctx.Set("sessionType", model.SESSIONTYPE_WEB)
|
||||
|
||||
@@ -60,7 +65,7 @@ func Connect(ctx *gin.Context) {
|
||||
if sess.IsGuacd() {
|
||||
protocols.HandleGuacd(sess)
|
||||
} else {
|
||||
protocols.HandleTerm(sess)
|
||||
HandleTerm(sess, ctx)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -254,3 +259,109 @@ func DoConnect(ctx *gin.Context, ws *websocket.Conn) (sess *gsession.Session, er
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// HandleTerm handles terminal sessions
|
||||
func HandleTerm(sess *gsession.Session, ctx *gin.Context) (err error) {
|
||||
defer func() {
|
||||
logger.L().Debug("defer HandleTerm", zap.String("sessionId", sess.SessionId))
|
||||
sess.SshParser.Close(sess.Prompt)
|
||||
sess.Status = model.SESSIONSTATUS_OFFLINE
|
||||
sess.ClosedAt = lo.ToPtr(time.Now())
|
||||
if err = gsession.UpsertSession(sess); err != nil {
|
||||
logger.L().Error("offline session failed", zap.String("sessionId", sess.SessionId), zap.Error(err))
|
||||
return
|
||||
}
|
||||
}()
|
||||
chs := sess.Chans
|
||||
tk, tk1s, tk1m := time.NewTicker(time.Millisecond*100), time.NewTicker(time.Second), time.NewTicker(time.Minute)
|
||||
assetService := service.NewAssetService()
|
||||
sess.G.Go(func() error {
|
||||
return protocols.Read(sess)
|
||||
})
|
||||
sess.G.Go(func() (err error) {
|
||||
defer sess.Chans.Rin.Close()
|
||||
defer sess.Chans.Wout.Close()
|
||||
for {
|
||||
select {
|
||||
case <-sess.Gctx.Done():
|
||||
protocols.Write(sess)
|
||||
return
|
||||
case <-chs.AwayChan:
|
||||
return
|
||||
case <-sess.IdleTk.C:
|
||||
msg := (&myErrors.ApiError{Code: myErrors.ErrIdleTimeout, Data: map[string]any{"second": model.GlobalConfig.Load().Timeout}}).MessageWithCtx(ctx)
|
||||
protocols.WriteErrMsg(sess, msg)
|
||||
return &myErrors.ApiError{Code: myErrors.ErrIdleTimeout, Data: map[string]any{"second": model.GlobalConfig.Load().Timeout}}
|
||||
case <-tk1m.C:
|
||||
asset, err := assetService.GetById(sess.Gctx, sess.AssetId)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if protocols.CheckTime(asset.AccessAuth) && (sess.ShareId == 0 || time.Now().Before(sess.ShareEnd)) {
|
||||
continue
|
||||
}
|
||||
return &myErrors.ApiError{Code: myErrors.ErrAccessTime}
|
||||
case closeBy := <-chs.CloseChan:
|
||||
msg := (&myErrors.ApiError{Code: myErrors.ErrAdminClose, Data: map[string]any{"admin": closeBy}}).MessageWithCtx(ctx)
|
||||
protocols.WriteErrMsg(sess, msg)
|
||||
logger.L().Info("closed by", zap.String("admin", closeBy))
|
||||
return &myErrors.ApiError{Code: myErrors.ErrAdminClose, Data: map[string]any{"admin": closeBy}}
|
||||
case err = <-chs.ErrChan:
|
||||
protocols.WriteErrMsg(sess, err.Error())
|
||||
return
|
||||
case in := <-chs.InChan:
|
||||
if sess.SessionType == model.SESSIONTYPE_WEB {
|
||||
rt := in[0]
|
||||
msg := in[1:]
|
||||
switch rt {
|
||||
case '1':
|
||||
in = msg
|
||||
case '9':
|
||||
continue
|
||||
case 'w':
|
||||
wh := strings.Split(string(msg), ",")
|
||||
if len(wh) < 2 {
|
||||
continue
|
||||
}
|
||||
chs.WindowChan <- ssh.Window{
|
||||
Width: cast.ToInt(wh[0]),
|
||||
Height: cast.ToInt(wh[1]),
|
||||
}
|
||||
continue
|
||||
}
|
||||
}
|
||||
if cmd, forbidden := sess.SshParser.AddInput(in); forbidden {
|
||||
protocols.WriteErrMsg(sess, fmt.Sprintf("%s is forbidden\n", cmd))
|
||||
sess.SshParser.AddInput(byteClearAll)
|
||||
chs.Win.Write(byteClearAll)
|
||||
continue
|
||||
}
|
||||
if _, err = chs.Win.Write(in); err != nil {
|
||||
return
|
||||
}
|
||||
case out := <-chs.OutChan:
|
||||
if _, err = chs.OutBuf.Write(out); err != nil {
|
||||
return
|
||||
}
|
||||
sess.SshParser.AddOutput(out)
|
||||
case <-tk.C:
|
||||
if err = protocols.Write(sess); err != nil {
|
||||
return
|
||||
}
|
||||
case <-tk1s.C:
|
||||
if sess.Ws == nil {
|
||||
continue
|
||||
}
|
||||
if err = sess.Ws.WriteMessage(websocket.TextMessage, nil); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
if err = sess.G.Wait(); err != nil {
|
||||
logger.L().Debug("handle term wait end", zap.String("id", sess.SessionId), zap.Error(err))
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
@@ -3,14 +3,10 @@ package protocols
|
||||
import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
"unicode/utf8"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/gliderlabs/ssh"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/samber/lo"
|
||||
"github.com/spf13/cast"
|
||||
"go.uber.org/zap"
|
||||
gossh "golang.org/x/crypto/ssh"
|
||||
@@ -19,7 +15,6 @@ import (
|
||||
"github.com/veops/oneterm/internal/service"
|
||||
gsession "github.com/veops/oneterm/internal/session"
|
||||
"github.com/veops/oneterm/internal/tunneling"
|
||||
myErrors "github.com/veops/oneterm/pkg/errors"
|
||||
"github.com/veops/oneterm/pkg/logger"
|
||||
)
|
||||
|
||||
@@ -131,107 +126,3 @@ func ConnectSsh(ctx *gin.Context, sess *gsession.Session, asset *model.Asset, ac
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// HandleTerm handles terminal sessions
|
||||
func HandleTerm(sess *gsession.Session) (err error) {
|
||||
defer func() {
|
||||
logger.L().Debug("defer HandleTerm", zap.String("sessionId", sess.SessionId))
|
||||
sess.SshParser.Close(sess.Prompt)
|
||||
sess.Status = model.SESSIONSTATUS_OFFLINE
|
||||
sess.ClosedAt = lo.ToPtr(time.Now())
|
||||
if err = gsession.UpsertSession(sess); err != nil {
|
||||
logger.L().Error("offline session failed", zap.String("sessionId", sess.SessionId), zap.Error(err))
|
||||
return
|
||||
}
|
||||
}()
|
||||
chs := sess.Chans
|
||||
tk, tk1s, tk1m := time.NewTicker(time.Millisecond*100), time.NewTicker(time.Second), time.NewTicker(time.Minute)
|
||||
assetService := service.NewAssetService()
|
||||
sess.G.Go(func() error {
|
||||
return Read(sess)
|
||||
})
|
||||
sess.G.Go(func() (err error) {
|
||||
defer sess.Chans.Rin.Close()
|
||||
defer sess.Chans.Wout.Close()
|
||||
for {
|
||||
select {
|
||||
case <-sess.Gctx.Done():
|
||||
Write(sess)
|
||||
return
|
||||
case <-chs.AwayChan:
|
||||
return
|
||||
case <-sess.IdleTk.C:
|
||||
WriteErrMsg(sess, "idle timeout\n\n")
|
||||
return &myErrors.ApiError{Code: myErrors.ErrIdleTimeout, Data: map[string]any{"second": model.GlobalConfig.Load().Timeout}}
|
||||
case <-tk1m.C:
|
||||
asset, err := assetService.GetById(sess.Gctx, sess.AssetId)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if CheckTime(asset.AccessAuth) && (sess.ShareId == 0 || time.Now().Before(sess.ShareEnd)) {
|
||||
continue
|
||||
}
|
||||
return &myErrors.ApiError{Code: myErrors.ErrAccessTime}
|
||||
case closeBy := <-chs.CloseChan:
|
||||
WriteErrMsg(sess, "closed by admin\n\n")
|
||||
logger.L().Info("closed by", zap.String("admin", closeBy))
|
||||
return &myErrors.ApiError{Code: myErrors.ErrAdminClose, Data: map[string]any{"admin": closeBy}}
|
||||
case err = <-chs.ErrChan:
|
||||
WriteErrMsg(sess, err.Error())
|
||||
return
|
||||
case in := <-chs.InChan:
|
||||
if sess.SessionType == model.SESSIONTYPE_WEB {
|
||||
rt := in[0]
|
||||
msg := in[1:]
|
||||
switch rt {
|
||||
case '1':
|
||||
in = msg
|
||||
case '9':
|
||||
continue
|
||||
case 'w':
|
||||
wh := strings.Split(string(msg), ",")
|
||||
if len(wh) < 2 {
|
||||
continue
|
||||
}
|
||||
chs.WindowChan <- ssh.Window{
|
||||
Width: cast.ToInt(wh[0]),
|
||||
Height: cast.ToInt(wh[1]),
|
||||
}
|
||||
continue
|
||||
}
|
||||
}
|
||||
if cmd, forbidden := sess.SshParser.AddInput(in); forbidden {
|
||||
WriteErrMsg(sess, fmt.Sprintf("%s is forbidden\n", cmd))
|
||||
sess.SshParser.AddInput(byteClearAll)
|
||||
chs.Win.Write(byteClearAll)
|
||||
continue
|
||||
}
|
||||
if _, err = chs.Win.Write(in); err != nil {
|
||||
return
|
||||
}
|
||||
case out := <-chs.OutChan:
|
||||
if _, err = chs.OutBuf.Write(out); err != nil {
|
||||
return
|
||||
}
|
||||
sess.SshParser.AddOutput(out)
|
||||
case <-tk.C:
|
||||
if err = Write(sess); err != nil {
|
||||
return
|
||||
}
|
||||
case <-tk1s.C:
|
||||
if sess.Ws == nil {
|
||||
continue
|
||||
}
|
||||
if err = sess.Ws.WriteMessage(websocket.TextMessage, nil); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
if err = sess.G.Wait(); err != nil {
|
||||
logger.L().Debug("handle term wait end", zap.String("id", sess.SessionId), zap.Error(err))
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
@@ -30,7 +30,6 @@ var (
|
||||
return true
|
||||
},
|
||||
}
|
||||
byteClearAll = []byte("\x15\r")
|
||||
|
||||
wsWriteMutex = &sync.Mutex{}
|
||||
)
|
||||
@@ -174,7 +173,7 @@ func Read(sess *gsession.Session) error {
|
||||
case websocket.TextMessage:
|
||||
chs.InChan <- msg
|
||||
if msg[0] != '9' && ((sess.IsGuacd() && len(msg) > 0) || (!sess.IsGuacd() && IsActive(msg))) {
|
||||
sess.SetIdle()
|
||||
sess.SetIdle() // TODO: performance issue
|
||||
}
|
||||
}
|
||||
} else if sess.SessionType == model.SESSIONTYPE_CLIENT {
|
||||
@@ -183,7 +182,7 @@ func Read(sess *gsession.Session) error {
|
||||
return err
|
||||
}
|
||||
chs.InChan <- p
|
||||
sess.SetIdle()
|
||||
sess.SetIdle() // TODO: performance issue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -112,7 +112,7 @@ func (r *AuthorizationRepository) GetAuthsByAsset(ctx context.Context, asset *mo
|
||||
|
||||
func (r *AuthorizationRepository) GetAuthorizationIds(ctx context.Context, resourceIds []int) ([]*model.AuthorizationIds, error) {
|
||||
var authIds []*model.AuthorizationIds
|
||||
err := r.db.Model(&model.Authorization{}).Find(&authIds).Where("resource_id IN ?", resourceIds).Error
|
||||
err := r.db.Model(&model.Authorization{}).Where("resource_id IN ?", resourceIds).Find(&authIds).Error
|
||||
return authIds, err
|
||||
}
|
||||
|
||||
|
||||
@@ -321,7 +321,6 @@ func getAuthorizations(ctx *gin.Context) (res []*acl.Resource, err error) {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -23,7 +23,6 @@ import (
|
||||
"github.com/veops/oneterm/internal/acl"
|
||||
"github.com/veops/oneterm/internal/api/controller"
|
||||
myConnector "github.com/veops/oneterm/internal/connector"
|
||||
"github.com/veops/oneterm/internal/connector/protocols"
|
||||
"github.com/veops/oneterm/internal/model"
|
||||
"github.com/veops/oneterm/internal/repository"
|
||||
"github.com/veops/oneterm/internal/service"
|
||||
@@ -376,7 +375,7 @@ func (conn *connector) Run() error {
|
||||
}
|
||||
}
|
||||
})
|
||||
protocols.HandleTerm(gsess)
|
||||
myConnector.HandleTerm(gsess, nil)
|
||||
|
||||
if err = gsess.G.Wait(); err != nil {
|
||||
logger.L().Error("sshsrv run stopped", zap.String("sessionId", gsess.SessionId), zap.Error(err))
|
||||
|
||||
@@ -86,8 +86,12 @@ func (ae *ApiError) MessageWithCtx(ctx *gin.Context) string {
|
||||
if ae == nil {
|
||||
return ""
|
||||
}
|
||||
lang := ctx.PostForm("lang")
|
||||
accept := ctx.GetHeader("Accept-Language")
|
||||
lang, accept := "en", "en"
|
||||
if ctx != nil {
|
||||
lang = ctx.PostForm("lang")
|
||||
accept = ctx.GetHeader("Accept-Language")
|
||||
}
|
||||
|
||||
localizer := i18n.NewLocalizer(myi18n.Bundle, lang, accept)
|
||||
return ae.Message(localizer)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user