diff --git a/config/config.go b/config/config.go index 1aebabd..0dcabb1 100644 --- a/config/config.go +++ b/config/config.go @@ -28,6 +28,7 @@ type configuration struct { KillWaitTime int `default:"5" describe:"kill信号等待时间(秒)"` TaskTimeout int `default:"60" describe:"任务执行超时时间(秒)"` TokenExpirationTime int64 `default:"720" describe:"token过期时间(小时)"` + WsProactiveHealthCheck bool `default:"false" describe:"ws主动健康检查"` CgroupPeriod int64 `default:"100000" describe:"CgroupPeriod"` CgroupSwapLimit bool `default:"false" describe:"cgroup swap限制"` CondWaitTime int `default:"30" describe:"长轮询等待时间(秒)"` diff --git a/internal/app/api/ws.go b/internal/app/api/ws.go index 12e5668..442f29f 100644 --- a/internal/app/api/ws.go +++ b/internal/app/api/ws.go @@ -64,7 +64,7 @@ func (w *wsApi) WebsocketHandle(ctx *gin.Context) { CancelFunc: cancel, } proc.ReadCache(wci) - w.startWsConnect(conn, proc, hasOprPermission(ctx, uuid, constants.OPERATION_TERMINAL_WRITE)) + w.startWsConnect(conn, cancel, proc, hasOprPermission(ctx, uuid, constants.OPERATION_TERMINAL_WRITE)) proc.AddConn(reqUser, wci) defer proc.DeleteConn(reqUser) conn.SetCloseHandler(func(_ int, _ string) error { @@ -84,14 +84,14 @@ func (w *wsApi) WebsocketHandle(ctx *gin.Context) { conn.Close() } -func (w *wsApi) startWsConnect(conn *websocket.Conn, proc logic.Process, write bool) { +func (w *wsApi) startWsConnect(conn *websocket.Conn, cancel context.CancelFunc, proc logic.Process, write bool) { log.Logger.Debugw("ws读取线程已启动") go func() { for { _, b, err := conn.ReadMessage() if err != nil { log.Logger.Debugw("ws读取线程已退出", "info", err) - conn.Close() + cancel() return } if write { @@ -100,22 +100,31 @@ func (w *wsApi) startWsConnect(conn *websocket.Conn, proc logic.Process, write b } } }() - // // health check - // pongChan := make(chan struct{}) - // conn.SetPongHandler(func(appData string) error { - // pongChan <- struct{}{} - // return nil - // }) - // timer := time.NewTicker(time.Second) - // go func() { - // for { - // conn.WriteMessage(websocket.PingMessage, nil) - // select { - // case <-pongChan: - // timer.Reset(time.Second) - // case <-timer.C: - // conn.Close() - // } - // } - // }() + if config.CF.WsProactiveHealthCheck { + w.proactiveHealthCheck(conn, cancel) + } +} + +func (w *wsApi) proactiveHealthCheck(conn *websocket.Conn, cancel context.CancelFunc) { + pongChan := make(chan struct{}) + conn.SetPongHandler(func(appData string) error { + pongChan <- struct{}{} + return nil + }) + timer := time.NewTimer(time.Second) + go func() { + defer timer.Stop() + for { + conn.WriteMessage(websocket.PingMessage, nil) + select { + case <-pongChan: + timer.Reset(time.Second) + case <-timer.C: + log.Logger.Debugw("pong报文超时,结束ws连接") + cancel() + return + } + time.Sleep(time.Second * 3) + } + }() }