health check support

This commit is contained in:
liuzhihang1
2025-02-16 00:53:48 +08:00
parent 96174f200d
commit 7343f45dda
2 changed files with 37 additions and 5 deletions

View File

@@ -28,6 +28,7 @@ type configuration struct {
KillWaitTime int `default:"5" describe:"kill信号等待时间"` KillWaitTime int `default:"5" describe:"kill信号等待时间"`
TaskTimeout int `default:"60" describe:"任务执行超时时间(秒)"` TaskTimeout int `default:"60" describe:"任务执行超时时间(秒)"`
TokenExpirationTime int64 `default:"720" describe:"token过期时间小时"` TokenExpirationTime int64 `default:"720" describe:"token过期时间小时"`
WsHealthCheckInterval int `default:"3" describe:"ws主动健康检查间隔"`
CgroupPeriod int64 `default:"100000" describe:"CgroupPeriod"` CgroupPeriod int64 `default:"100000" describe:"CgroupPeriod"`
CgroupSwapLimit bool `default:"false" describe:"cgroup swap限制"` CgroupSwapLimit bool `default:"false" describe:"cgroup swap限制"`
CondWaitTime int `default:"30" describe:"长轮询等待时间(秒)"` CondWaitTime int `default:"30" describe:"长轮询等待时间(秒)"`

View File

@@ -2,6 +2,7 @@ package api
import ( import (
"context" "context"
"sync"
"time" "time"
"github.com/lzh-1625/go_process_manager/config" "github.com/lzh-1625/go_process_manager/config"
@@ -21,14 +22,19 @@ var WsApi = new(wsApi)
type WsConnetInstance struct { type WsConnetInstance struct {
WsConnect *websocket.Conn WsConnect *websocket.Conn
wsLock sync.Mutex
CancelFunc context.CancelFunc CancelFunc context.CancelFunc
} }
func (w *WsConnetInstance) Write(b []byte) { func (w *WsConnetInstance) Write(b []byte) {
w.wsLock.Lock()
defer w.wsLock.Unlock()
w.WsConnect.WriteMessage(websocket.BinaryMessage, b) w.WsConnect.WriteMessage(websocket.BinaryMessage, b)
} }
func (w *WsConnetInstance) WriteString(s string) { func (w *WsConnetInstance) WriteString(s string) {
w.wsLock.Lock()
defer w.wsLock.Unlock()
w.WsConnect.WriteMessage(websocket.TextMessage, []byte(s)) w.WsConnect.WriteMessage(websocket.TextMessage, []byte(s))
} }
func (w *WsConnetInstance) Cancel() { func (w *WsConnetInstance) Cancel() {
@@ -62,10 +68,12 @@ func (w *wsApi) WebsocketHandle(ctx *gin.Context) {
wci := &WsConnetInstance{ wci := &WsConnetInstance{
WsConnect: conn, WsConnect: conn,
CancelFunc: cancel, CancelFunc: cancel,
wsLock: sync.Mutex{},
} }
proc.ReadCache(wci) proc.ReadCache(wci)
w.startWsConnect(conn, cancel, proc, hasOprPermission(ctx, uuid, constants.OPERATION_TERMINAL_WRITE)) w.startWsConnect(wci, cancel, proc, hasOprPermission(ctx, uuid, constants.OPERATION_TERMINAL_WRITE))
proc.AddConn(reqUser, wci) proc.AddConn(reqUser, wci)
defer middle.ProcessWaitCond.Trigger()
defer proc.DeleteConn(reqUser) defer proc.DeleteConn(reqUser)
conn.SetCloseHandler(func(_ int, _ string) error { conn.SetCloseHandler(func(_ int, _ string) error {
middle.ProcessWaitCond.Trigger() middle.ProcessWaitCond.Trigger()
@@ -84,20 +92,43 @@ func (w *wsApi) WebsocketHandle(ctx *gin.Context) {
conn.Close() conn.Close()
} }
func (w *wsApi) startWsConnect(conn *websocket.Conn, cancel context.CancelFunc, proc logic.Process, write bool) { func (w *wsApi) startWsConnect(wci *WsConnetInstance, cancel context.CancelFunc, proc logic.Process, write bool) {
log.Logger.Debugw("ws读取线程已启动") log.Logger.Debugw("ws读取线程已启动")
go func() { go func() {
for { for {
_, b, err := conn.ReadMessage() _, b, err := wci.WsConnect.ReadMessage()
if err != nil { if err != nil {
log.Logger.Debugw("ws读取线程已退出", "info", err) log.Logger.Debugw("ws读取线程已退出", "info", err)
cancel()
return return
} }
if write { if write {
proc.WriteBytes(b) proc.WriteBytes(b)
continue
} }
} }
}() }()
// proactive health check
pongChan := make(chan struct{})
wci.WsConnect.SetPongHandler(func(appData string) error {
pongChan <- struct{}{}
return nil
})
timer := time.NewTicker(time.Second * time.Duration(config.CF.WsHealthCheckInterval))
go func() {
defer timer.Stop()
for {
wci.wsLock.Lock()
wci.WsConnect.WriteMessage(websocket.PingMessage, nil)
wci.wsLock.Unlock()
select {
case <-pongChan:
timer.Reset(time.Second * time.Duration(config.CF.WsHealthCheckInterval))
case <-timer.C:
log.Logger.Debugw("pong报文超时,结束ws连接")
cancel()
return
}
}
}()
} }