mirror of
https://github.com/lzh-1625/go_process_manager.git
synced 2025-09-28 21:02:11 +08:00
97 lines
2.3 KiB
Go
97 lines
2.3 KiB
Go
package api
|
||
|
||
import (
|
||
"context"
|
||
"msm/consts/ctxflag"
|
||
"msm/log"
|
||
"msm/service/process"
|
||
"net/http"
|
||
"strconv"
|
||
"time"
|
||
|
||
"github.com/gin-gonic/gin"
|
||
"github.com/gorilla/websocket"
|
||
)
|
||
|
||
type wsApi struct{}
|
||
|
||
var WsApi = new(wsApi)
|
||
|
||
var upgrader = websocket.Upgrader{
|
||
ReadBufferSize: 1024,
|
||
WriteBufferSize: 1024,
|
||
CheckOrigin: func(r *http.Request) bool {
|
||
return true
|
||
},
|
||
}
|
||
|
||
func (w *wsApi) WebsocketHandle(ctx *gin.Context) {
|
||
reqUser := ctx.GetString(ctxflag.USER_NAME)
|
||
uuid, err := strconv.Atoi(ctx.Query("uuid"))
|
||
errCheck(ctx, err != nil, "参数有误")
|
||
proc, err := process.ProcessCtlService.GetProcess(uuid)
|
||
errCheck(ctx, err != nil, "进程获取失败")
|
||
errCheck(ctx, proc.GetStateState() != 1, "进程未运行")
|
||
errCheck(ctx, proc.GetControlController() != reqUser && !proc.VerifyControl(), "进程权限不足")
|
||
errCheck(ctx, !proc.TryLock(), "进程已被占用")
|
||
proc.SetWhoUsing(reqUser)
|
||
conn, err := upgrader.Upgrade(ctx.Writer, ctx.Request, nil)
|
||
errCheck(ctx, err != nil, "ws升级失败")
|
||
log.Logger.Infow("ws连接成功", "进程名称", proc.GetName(), "连接者", proc.GetWhoUsing())
|
||
proc.SetControlController("")
|
||
wsCtx, cancel := context.WithCancel(context.Background())
|
||
w.startWsConnect(conn, proc, cancel)
|
||
proc.SetWsConn(conn)
|
||
proc.SetIsUsing(true)
|
||
close := func(err string) {
|
||
proc.SetWhoUsing("")
|
||
proc.SetIsUsing(false)
|
||
proc.SetWsConn(nil)
|
||
conn.Close()
|
||
proc.Unlock()
|
||
log.Logger.Infow("ws连接断开", "操作类型", err, "进程名称", proc.GetName())
|
||
}
|
||
conn.SetCloseHandler(func(_ int, _ string) error {
|
||
proc.ChangControlChan() <- 1
|
||
close("ws连接被断开")
|
||
return nil
|
||
})
|
||
select {
|
||
case signal := <-proc.ChangControlChan():
|
||
{
|
||
if signal == 0 {
|
||
close("强制断开ws连接")
|
||
}
|
||
}
|
||
case <-proc.StopChan():
|
||
{
|
||
close("进程已停止,强制断开ws连接")
|
||
}
|
||
case <-time.After(time.Minute * 10):
|
||
{
|
||
close("连接时间超过最大时长限制")
|
||
}
|
||
case <-wsCtx.Done():
|
||
{
|
||
close("tcp连接建立已被关闭")
|
||
}
|
||
}
|
||
|
||
}
|
||
|
||
func (w *wsApi) startWsConnect(conn *websocket.Conn, proc process.Process, cancel context.CancelFunc) {
|
||
proc.ReadCache(conn)
|
||
log.Logger.Debugw("ws读取线程已启动")
|
||
go func() {
|
||
for {
|
||
_, b, err := conn.ReadMessage()
|
||
if err != nil {
|
||
log.Logger.Debugw("ws读取线程已退出", "info", err)
|
||
cancel()
|
||
return
|
||
}
|
||
proc.WriteBytes(b)
|
||
}
|
||
}()
|
||
}
|