修复静态服务发现健康检查的一系列问题

This commit is contained in:
Liujian
2025-06-16 11:04:41 +08:00
parent 6b7a2afd21
commit e4d364c393
5 changed files with 23 additions and 56 deletions

View File

@@ -3,6 +3,5 @@ package discovery
// IHealthChecker 健康检查接口
type IHealthChecker interface {
Check(nodes INodes)
Reset(conf interface{}) error
Stop()
}

View File

@@ -35,30 +35,21 @@ func (s *HeathCheckHandler) reset(cfg *Config) error {
}
return nil
}
checker := s.checker
if checker == nil {
checker = health_check_http.NewHTTPCheck(
health_check_http.Config{
Protocol: cfg.Health.Scheme,
Method: cfg.Health.Method,
URL: cfg.Health.URL,
SuccessCode: cfg.Health.SuccessCode,
Period: time.Duration(cfg.Health.Period) * time.Second,
Timeout: time.Duration(cfg.Health.Timeout) * time.Millisecond,
})
checker.Check(s.nodes)
} else {
_ = checker.Reset(
health_check_http.Config{
Protocol: cfg.Health.Scheme,
Method: cfg.Health.Method,
URL: cfg.Health.URL,
SuccessCode: cfg.Health.SuccessCode,
Period: time.Duration(cfg.Health.Period) * time.Second,
Timeout: time.Duration(cfg.Health.Timeout) * time.Millisecond,
},
)
checker := health_check_http.NewHTTPCheck(
health_check_http.Config{
Protocol: cfg.Health.Scheme,
Method: cfg.Health.Method,
URL: cfg.Health.URL,
SuccessCode: cfg.Health.SuccessCode,
Period: time.Duration(cfg.Health.Period) * time.Second,
Timeout: time.Duration(cfg.Health.Timeout) * time.Millisecond,
})
checker.Check(s.nodes)
if s.checker != nil {
s.checker.Stop()
}
s.checker = checker
return nil

View File

@@ -86,9 +86,9 @@ func (h *HttpComplete) Complete(org eocontext.EoContext) error {
}
lastErr = ctx.SendTo(scheme, node, balanceTimeout)
if lastErr == nil {
return nil
}
node.Down()
log.Error("http upstream send error: ", lastErr)
}

View File

@@ -39,7 +39,6 @@ type HTTPCheck struct {
nodes discovery.INodes
ctx context.Context
cancel context.CancelFunc
client *http.Client
locker sync.RWMutex
}
@@ -54,6 +53,7 @@ func (h *HTTPCheck) doCheckLoop(nodes discovery.INodes) {
return
}
ticker := time.NewTicker(h.config.Period)
//timer := time.NewTimer(h.config.Period)
defer ticker.Stop()
for {
@@ -62,26 +62,12 @@ func (h *HTTPCheck) doCheckLoop(nodes discovery.INodes) {
return
case <-ticker.C:
{
h.check(nodes)
}
}
}
}
// Reset 重置HTTPCheck的配置
func (h *HTTPCheck) Reset(conf interface{}) error {
cf, ok := conf.(Config)
if !ok {
return nil
}
h.reset(&cf)
return nil
}
func (h *HTTPCheck) reset(conf *Config) {
h.config = conf
}
// Stop 停止HTTPCheck中止定时检查
func (h *HTTPCheck) Stop() {
h.cancel()
@@ -96,24 +82,27 @@ func (h *HTTPCheck) check(nodes discovery.INodes) {
失败则下次定时检查再进行检测
*/
for _, ns := range nodes.All() {
if ns.Status() != discovery.Down {
continue
}
//if ns.Status() != discovery.Down {
// continue
//}
uri := fmt.Sprintf("%s://%s/%s", h.config.Protocol, strings.TrimSuffix(ns.Addr(), "/"), strings.TrimPrefix(h.config.URL, "/"))
h.client.Timeout = h.config.Timeout
request, err := http.NewRequest(h.config.Method, uri, nil)
if err != nil {
log.Error(err)
ns.Down()
continue
}
resp, err := h.client.Do(request)
if err != nil {
log.Error(err)
ns.Down()
continue
}
resp.Body.Close()
if h.config.SuccessCode != resp.StatusCode {
log.Error(err)
ns.Down()
continue
}
ns.Up()

View File

@@ -92,24 +92,12 @@ func (r *ProxyRequest) reset(request *fasthttp.Request, remoteAddr string) {
r.req.Header.Set("x-forwarded-for", r.remoteAddr)
}
}
//if len(forwardedFor) > 0 {
// r.req.Header.SetProvider("x-forwarded-for", fmt.Sprint(string(forwardedFor), ", ", r.remoteAddr))
//} else {
// r.req.Header.SetProvider("x-forwarded-for", r.remoteAddr)
//}
if r.realIP != "0.0.0.0" {
r.req.Header.Set("x-real-ip", r.realIP)
}
}
//func NewProxyRequest(request *fasthttp.Request, remoteAddr string) *ProxyRequest {
// proxyRequest := fasthttp.AcquireRequest()
// request.CopyTo(proxyRequest)
// return &ProxyRequest{
// RequestReader: NewRequestReader(proxyRequest, remoteAddr),
// }
//}
func (r *ProxyRequest) SetMethod(s string) {
r.Request().Header.SetMethod(s)
}