mirror of
				https://github.com/lzh-1625/go_process_manager.git
				synced 2025-10-31 11:26:49 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			102 lines
		
	
	
		
			3.0 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			102 lines
		
	
	
		
			3.0 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package api
 | ||
| 
 | ||
| import (
 | ||
| 	"context"
 | ||
| 	"time"
 | ||
| 
 | ||
| 	"github.com/lzh-1625/go_process_manager/internal/app/constants"
 | ||
| 	"github.com/lzh-1625/go_process_manager/internal/app/middle"
 | ||
| 	"github.com/lzh-1625/go_process_manager/internal/app/service"
 | ||
| 	"github.com/lzh-1625/go_process_manager/log"
 | ||
| 	"github.com/lzh-1625/go_process_manager/utils"
 | ||
| 
 | ||
| 	"github.com/gin-gonic/gin"
 | ||
| 	"github.com/gorilla/websocket"
 | ||
| )
 | ||
| 
 | ||
| type wsApi struct{}
 | ||
| 
 | ||
| var WsApi = new(wsApi)
 | ||
| 
 | ||
| type WsConnetInstance struct {
 | ||
| 	WsConnect  *websocket.Conn
 | ||
| 	CancelFunc context.CancelFunc
 | ||
| }
 | ||
| 
 | ||
| func (w *WsConnetInstance) Write(b []byte) {
 | ||
| 	w.WsConnect.WriteMessage(websocket.BinaryMessage, b)
 | ||
| }
 | ||
| 
 | ||
| func (w *WsConnetInstance) WriteString(s string) {
 | ||
| 	w.WsConnect.WriteMessage(websocket.TextMessage, []byte(s))
 | ||
| }
 | ||
| func (w *WsConnetInstance) Cancel() {
 | ||
| 	w.CancelFunc()
 | ||
| }
 | ||
| 
 | ||
| var upgrader = websocket.Upgrader{
 | ||
| 	ReadBufferSize:  1024,
 | ||
| 	WriteBufferSize: 1024,
 | ||
| }
 | ||
| 
 | ||
| func (w *wsApi) WebsocketHandle(ctx *gin.Context) {
 | ||
| 	reqUser := getUserName(ctx)
 | ||
| 	uuid := getQueryInt(ctx, "uuid")
 | ||
| 	proc, err := service.ProcessCtlService.GetProcess(uuid)
 | ||
| 	errCheck(ctx, err != nil, "Operation failed!")
 | ||
| 	errCheck(ctx, proc.HasWsConn(reqUser), "A connection already exists; unable to establish a new one!")
 | ||
| 	errCheck(ctx, proc.State.State != 1, "The process is currently running.")
 | ||
| 	errCheck(ctx, proc.Control.Controller != reqUser && !proc.VerifyControl(), "Insufficient permissions; please check your access rights!")
 | ||
| 	conn, err := upgrader.Upgrade(ctx.Writer, ctx.Request, nil)
 | ||
| 	errCheck(ctx, err != nil, "WebSocket connection upgrade failed!")
 | ||
| 
 | ||
| 	log.Logger.AddAdditionalInfo("processName", proc.Name)
 | ||
| 	log.Logger.AddAdditionalInfo("userName", reqUser)
 | ||
| 	defer log.Logger.DeleteAdditionalInfo(2)
 | ||
| 
 | ||
| 	log.Logger.Infow("ws连接成功")
 | ||
| 
 | ||
| 	proc.SetTerminalSize(utils.GetIntByString(ctx.Query("cols")), utils.GetIntByString(ctx.Query("rows")))
 | ||
| 	wsCtx, cancel := context.WithCancel(context.Background())
 | ||
| 	wci := &WsConnetInstance{
 | ||
| 		WsConnect:  conn,
 | ||
| 		CancelFunc: cancel,
 | ||
| 	}
 | ||
| 	proc.ReadCache(wci)
 | ||
| 	w.startWsConnect(conn, proc, hasOprPermission(ctx, uuid, constants.OPERATION_TERMINAL_WRITE))
 | ||
| 	proc.AddConn(reqUser, wci)
 | ||
| 	defer proc.DeleteConn(reqUser)
 | ||
| 	conn.SetCloseHandler(func(_ int, _ string) error {
 | ||
| 		middle.ProcessWaitCond.Trigger()
 | ||
| 		cancel()
 | ||
| 		return nil
 | ||
| 	})
 | ||
| 	middle.ProcessWaitCond.Trigger()
 | ||
| 	select {
 | ||
| 	case <-proc.StopChan:
 | ||
| 		log.Logger.Infow("ws连接断开", "操作类型", "进程已停止,强制断开ws连接")
 | ||
| 	case <-time.After(time.Minute * 10):
 | ||
| 		log.Logger.Infow("ws连接断开", "操作类型", "连接时间超过最大时长限制")
 | ||
| 	case <-wsCtx.Done():
 | ||
| 		log.Logger.Infow("ws连接断开", "操作类型", "tcp连接建立已被关闭")
 | ||
| 	}
 | ||
| 	conn.Close()
 | ||
| }
 | ||
| 
 | ||
| func (w *wsApi) startWsConnect(conn *websocket.Conn, proc service.Process, write bool) {
 | ||
| 	log.Logger.Debugw("ws读取线程已启动")
 | ||
| 	go func() {
 | ||
| 		for {
 | ||
| 			_, b, err := conn.ReadMessage()
 | ||
| 			if err != nil {
 | ||
| 				log.Logger.Debugw("ws读取线程已退出", "info", err)
 | ||
| 				return
 | ||
| 			}
 | ||
| 			if write {
 | ||
| 				proc.WriteBytes(b)
 | ||
| 				continue
 | ||
| 			}
 | ||
| 		}
 | ||
| 	}()
 | ||
| }
 | 
