From 7343f45dda2ad88fccb402c50a7f15311b380a19 Mon Sep 17 00:00:00 2001 From: liuzhihang1 Date: Sun, 16 Feb 2025 00:53:48 +0800 Subject: [PATCH] health check support --- config/config.go | 1 + internal/app/api/ws.go | 41 ++++++++++++++++++++++++++++++++++++----- 2 files changed, 37 insertions(+), 5 deletions(-) diff --git a/config/config.go b/config/config.go index 1aebabd..639f2cb 100644 --- a/config/config.go +++ b/config/config.go @@ -28,6 +28,7 @@ type configuration struct { KillWaitTime int `default:"5" describe:"kill信号等待时间(秒)"` TaskTimeout int `default:"60" describe:"任务执行超时时间(秒)"` TokenExpirationTime int64 `default:"720" describe:"token过期时间(小时)"` + WsHealthCheckInterval int `default:"3" describe:"ws主动健康检查间隔(秒)"` CgroupPeriod int64 `default:"100000" describe:"CgroupPeriod"` CgroupSwapLimit bool `default:"false" describe:"cgroup swap限制"` CondWaitTime int `default:"30" describe:"长轮询等待时间(秒)"` diff --git a/internal/app/api/ws.go b/internal/app/api/ws.go index 5dc261a..2e1d8a0 100644 --- a/internal/app/api/ws.go +++ b/internal/app/api/ws.go @@ -2,6 +2,7 @@ package api import ( "context" + "sync" "time" "github.com/lzh-1625/go_process_manager/config" @@ -21,14 +22,19 @@ var WsApi = new(wsApi) type WsConnetInstance struct { WsConnect *websocket.Conn + wsLock sync.Mutex CancelFunc context.CancelFunc } func (w *WsConnetInstance) Write(b []byte) { + w.wsLock.Lock() + defer w.wsLock.Unlock() w.WsConnect.WriteMessage(websocket.BinaryMessage, b) } func (w *WsConnetInstance) WriteString(s string) { + w.wsLock.Lock() + defer w.wsLock.Unlock() w.WsConnect.WriteMessage(websocket.TextMessage, []byte(s)) } func (w *WsConnetInstance) Cancel() { @@ -62,10 +68,12 @@ func (w *wsApi) WebsocketHandle(ctx *gin.Context) { wci := &WsConnetInstance{ WsConnect: conn, CancelFunc: cancel, + wsLock: sync.Mutex{}, } proc.ReadCache(wci) - w.startWsConnect(conn, cancel, proc, hasOprPermission(ctx, uuid, constants.OPERATION_TERMINAL_WRITE)) + w.startWsConnect(wci, cancel, proc, hasOprPermission(ctx, uuid, constants.OPERATION_TERMINAL_WRITE)) proc.AddConn(reqUser, wci) + defer middle.ProcessWaitCond.Trigger() defer proc.DeleteConn(reqUser) conn.SetCloseHandler(func(_ int, _ string) error { middle.ProcessWaitCond.Trigger() @@ -84,20 +92,43 @@ func (w *wsApi) WebsocketHandle(ctx *gin.Context) { conn.Close() } -func (w *wsApi) startWsConnect(conn *websocket.Conn, cancel context.CancelFunc, proc logic.Process, write bool) { +func (w *wsApi) startWsConnect(wci *WsConnetInstance, cancel context.CancelFunc, proc logic.Process, write bool) { log.Logger.Debugw("ws读取线程已启动") go func() { for { - _, b, err := conn.ReadMessage() + _, b, err := wci.WsConnect.ReadMessage() if err != nil { log.Logger.Debugw("ws读取线程已退出", "info", err) - cancel() return } if write { proc.WriteBytes(b) - continue } } }() + + // proactive health check + pongChan := make(chan struct{}) + wci.WsConnect.SetPongHandler(func(appData string) error { + pongChan <- struct{}{} + return nil + }) + timer := time.NewTicker(time.Second * time.Duration(config.CF.WsHealthCheckInterval)) + go func() { + defer timer.Stop() + for { + wci.wsLock.Lock() + wci.WsConnect.WriteMessage(websocket.PingMessage, nil) + wci.wsLock.Unlock() + select { + case <-pongChan: + timer.Reset(time.Second * time.Duration(config.CF.WsHealthCheckInterval)) + case <-timer.C: + log.Logger.Debugw("pong报文超时,结束ws连接") + cancel() + return + } + } + }() + }