diff --git a/backend/internal/connector/handlers.go b/backend/internal/connector/handlers.go index f992640..752ec9f 100644 --- a/backend/internal/connector/handlers.go +++ b/backend/internal/connector/handlers.go @@ -399,9 +399,6 @@ func HandleTerm(sess *gsession.Session, ctx *gin.Context) (err error) { protocols.WriteErrMsg(sess, msg) logger.L().Info("closed by", zap.String("admin", closeBy)) return &myErrors.ApiError{Code: myErrors.ErrAdminClose, Data: map[string]any{"admin": closeBy}} - case err = <-chs.ErrChan: - protocols.WriteErrMsg(sess, err.Error()) - return case in := <-chs.InChan: if sess.SessionType == model.SESSIONTYPE_WEB { rt := in[0] @@ -455,6 +452,5 @@ func HandleTerm(sess *gsession.Session, ctx *gin.Context) (err error) { if err = sess.G.Wait(); err != nil { logger.L().Debug("handle term wait end", zap.String("id", sess.SessionId), zap.Error(err)) } - return } diff --git a/backend/internal/connector/protocols/telnet.go b/backend/internal/connector/protocols/telnet.go index fd89377..b3460e8 100644 --- a/backend/internal/connector/protocols/telnet.go +++ b/backend/internal/connector/protocols/telnet.go @@ -68,7 +68,6 @@ func ConnectTelnet(ctx *gin.Context, sess *gsession.Session, asset *model.Asset, logger.L().Error("telnet dial failed", zap.Error(err)) return } - defer conn.Close() // Setup authentication control mechanisms authDone := make(chan bool, 1) @@ -205,6 +204,7 @@ func ConnectTelnet(ctx *gin.Context, sess *gsession.Session, asset *model.Asset, // Data flow from client to server // Reads from input pipe and writes to telnet connection sess.G.Go(func() error { + buf := make([]byte, 1024) for { select { @@ -214,10 +214,11 @@ func ConnectTelnet(ctx *gin.Context, sess *gsession.Session, asset *model.Asset, n, err := chs.Rin.Read(buf) if err != nil { if err == io.EOF { - continue + conn.Close() + return nil } if err.Error() == "io: read/write on closed pipe" { - return nil // Normal exit condition, not an error + return nil } logger.L().Error("read from input pipe failed", zap.Error(err)) return err @@ -226,7 +227,7 @@ func ConnectTelnet(ctx *gin.Context, sess *gsession.Session, asset *model.Asset, if n > 0 { _, err = conn.Write(buf[:n]) if err != nil { - logger.L().Error("write to telnet failed", zap.Error(err)) + logger.L().Info("write to telnet failed, connection likely closed", zap.Error(err)) return err } } @@ -237,6 +238,17 @@ func ConnectTelnet(ctx *gin.Context, sess *gsession.Session, asset *model.Asset, // Data flow from server to client // Reads from telnet connection, processes telnet protocol, and sends to output channel sess.G.Go(func() error { + defer func() { + conn.Close() + if chs.Rin != nil { + chs.Rin.Close() + } + // Close AwayChan to signal connection end + sess.Once.Do(func() { + close(chs.AwayChan) + }) + }() + buf := make([]byte, 8192) for { select { @@ -253,11 +265,10 @@ func ConnectTelnet(ctx *gin.Context, sess *gsession.Session, asset *model.Asset, continue } if err == io.EOF { - logger.L().Info("telnet connection closed by server") return fmt.Errorf("telnet connection closed") } if strings.Contains(err.Error(), "use of closed network connection") { - return nil // Normal connection close, not an error + return nil } logger.L().Error("read from telnet failed", zap.Error(err)) return err @@ -275,8 +286,6 @@ func ConnectTelnet(ctx *gin.Context, sess *gsession.Session, asset *model.Asset, // Signal successful connection chs.ErrChan <- nil - // Wait for all goroutines to complete - err = sess.G.Wait() return nil } diff --git a/backend/internal/connector/protocols/utils.go b/backend/internal/connector/protocols/utils.go index a0d16de..47d6e23 100644 --- a/backend/internal/connector/protocols/utils.go +++ b/backend/internal/connector/protocols/utils.go @@ -158,6 +158,39 @@ func Write(sess *gsession.Session, skipRecording ...bool) (err error) { // Read reads data from the session input func Read(sess *gsession.Session) error { chs := sess.Chans + + // Handle CLIENT type with non-blocking reads + if sess.SessionType == model.SESSIONTYPE_CLIENT { + readChan := make(chan []byte) + errChan := make(chan error) + + go func() { + for { + p, err := sess.CliRw.Read() + if err != nil { + errChan <- err + return + } + readChan <- p + } + }() + + for { + select { + case <-sess.Gctx.Done(): + return nil + case <-sess.Chans.AwayChan: + return nil + case err := <-errChan: + return err + case p := <-readChan: + chs.InChan <- p + sess.SetIdle() + } + } + } + + // Original logic for WEB type for { select { case <-sess.Gctx.Done(): @@ -180,13 +213,6 @@ func Read(sess *gsession.Session) error { sess.SetIdle() // TODO: performance issue } } - } else if sess.SessionType == model.SESSIONTYPE_CLIENT { - p, err := sess.CliRw.Read() - if err != nil { - return err - } - chs.InChan <- p - sess.SetIdle() // TODO: performance issue } } } diff --git a/backend/internal/sshsrv/view.go b/backend/internal/sshsrv/view.go index 5095b9c..31a9451 100644 --- a/backend/internal/sshsrv/view.go +++ b/backend/internal/sshsrv/view.go @@ -512,6 +512,7 @@ func (m *view) handleConnectionCommand(cmd string) tea.Cmd { m.Ctx.Params = append(m.Ctx.Params, gin.Param{Key: "asset_id", Value: cast.ToString(m.combines[cmd][1])}) m.Ctx.Params = append(m.Ctx.Params, gin.Param{Key: "protocol", Value: fmt.Sprintf("%s:%d", p, m.combines[cmd][2])}) m.Ctx = m.Ctx.Copy() + m.Ctx.Set("sessionType", model.SESSIONTYPE_CLIENT) m.connecting = true return tea.Sequence( @@ -758,8 +759,14 @@ func (conn *connector) Run() error { r, w := io.Pipe() go func() { + defer w.Close() _, err := io.Copy(w, conn.stdin) - gsess.Chans.ErrChan <- err + // Don't block on sending error - HandleTerm may have already returned + select { + case gsess.Chans.ErrChan <- err: + default: + // Channel is closed or no one is listening, just return + } }() gsess.CliRw = &session.CliRW{ @@ -784,7 +791,13 @@ func (conn *connector) Run() error { case <-gsess.Gctx.Done(): return case w := <-ch: - gsess.Chans.WindowChan <- w + // Non-blocking send to WindowChan + // Some protocols (like telnet) don't handle window changes + select { + case gsess.Chans.WindowChan <- w: + default: + // If no one is listening, just ignore + } } } })