Files
goproxy/internal/metrics/metrics.go
DarkiT 7efc72b362 增加:
1.  监控指标收集
2.  中间件机制
3.  配置热更新
4.  优雅关闭
5.  插件系统
6.  API文档
7.  认证授权系统
8.  请求/响应压缩优化
2025-03-13 22:58:39 +08:00

388 lines
11 KiB
Go

package metrics
import (
"fmt"
"net/http"
"runtime"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
// MetricsCollector 监控指标接口
type MetricsCollector interface {
// 增加请求计数
IncRequestCount()
// 增加错误计数
IncErrorCount(err error)
// 观察请求持续时间
ObserveRequestDuration(seconds float64)
// 增加活跃连接数
IncActiveConnections()
// 减少活跃连接数
DecActiveConnections()
// 设置后端健康状态
SetBackendHealth(backend string, healthy bool)
// 设置后端响应时间
SetBackendResponseTime(backend string, duration time.Duration)
// 观察请求字节数
ObserveRequestBytes(bytes int64)
// 观察响应字节数
ObserveResponseBytes(bytes int64)
// 添加传输字节数
AddBytesTransferred(direction string, bytes int64)
// 增加缓存命中计数
IncCacheHit()
// 获取指标处理器
GetHandler() http.Handler
}
// PrometheusMetrics 指标收集器
type PrometheusMetrics struct {
// 请求总数
requestTotal *prometheus.CounterVec
// 请求延迟
requestLatency *prometheus.HistogramVec
// 请求大小
requestSize *prometheus.HistogramVec
// 响应大小
responseSize *prometheus.HistogramVec
// 错误总数
errorTotal *prometheus.CounterVec
// 活跃连接数
activeConnections prometheus.Gauge
// 连接池大小
connectionPoolSize prometheus.Gauge
// 缓存命中率
cacheHitRate prometheus.Gauge
// 内存使用量
memoryUsage prometheus.Gauge
// 锁
mu sync.RWMutex
}
// NewPrometheusMetrics 创建指标收集器
func NewPrometheusMetrics() *PrometheusMetrics {
m := &PrometheusMetrics{
requestTotal: promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "proxy_requests_total",
Help: "代理请求总数",
},
[]string{"method", "path", "status"},
),
requestLatency: promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "proxy_request_latency_seconds",
Help: "代理请求延迟",
Buckets: prometheus.DefBuckets,
},
[]string{"method", "path"},
),
requestSize: promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "proxy_request_size_bytes",
Help: "代理请求大小",
Buckets: prometheus.ExponentialBuckets(100, 2, 10),
},
[]string{"method", "path"},
),
responseSize: promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "proxy_response_size_bytes",
Help: "代理响应大小",
Buckets: prometheus.ExponentialBuckets(100, 2, 10),
},
[]string{"method", "path"},
),
errorTotal: promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "proxy_errors_total",
Help: "代理错误总数",
},
[]string{"type"},
),
activeConnections: promauto.NewGauge(
prometheus.GaugeOpts{
Name: "proxy_active_connections",
Help: "活跃连接数",
},
),
connectionPoolSize: promauto.NewGauge(
prometheus.GaugeOpts{
Name: "proxy_connection_pool_size",
Help: "连接池大小",
},
),
cacheHitRate: promauto.NewGauge(
prometheus.GaugeOpts{
Name: "proxy_cache_hit_rate",
Help: "缓存命中率",
},
),
memoryUsage: promauto.NewGauge(
prometheus.GaugeOpts{
Name: "proxy_memory_usage_bytes",
Help: "内存使用量",
},
),
}
// 启动定期更新
go m.updateMetrics()
return m
}
// updateMetrics 定期更新指标
func (m *PrometheusMetrics) updateMetrics() {
ticker := time.NewTicker(15 * time.Second)
defer ticker.Stop()
for range ticker.C {
// 更新内存使用量
var mem runtime.MemStats
runtime.ReadMemStats(&mem)
m.memoryUsage.Set(float64(mem.Alloc))
}
}
// RecordRequest 记录请求
func (m *PrometheusMetrics) RecordRequest(method, path string, status int, latency time.Duration, reqSize, respSize int64) {
m.requestTotal.WithLabelValues(method, path, strconv.Itoa(status)).Inc()
m.requestLatency.WithLabelValues(method, path).Observe(latency.Seconds())
m.requestSize.WithLabelValues(method, path).Observe(float64(reqSize))
m.responseSize.WithLabelValues(method, path).Observe(float64(respSize))
}
// RecordError 记录错误
func (m *PrometheusMetrics) RecordError(errType string) {
m.errorTotal.WithLabelValues(errType).Inc()
}
// SetActiveConnections 设置活跃连接数
func (m *PrometheusMetrics) SetActiveConnections(count int) {
m.activeConnections.Set(float64(count))
}
// SetConnectionPoolSize 设置连接池大小
func (m *PrometheusMetrics) SetConnectionPoolSize(size int) {
m.connectionPoolSize.Set(float64(size))
}
// SetCacheHitRate 设置缓存命中率
func (m *PrometheusMetrics) SetCacheHitRate(rate float64) {
m.cacheHitRate.Set(rate)
}
// SimpleMetrics 简单指标实现
type SimpleMetrics struct {
// 请求计数
requestCount int64
// 错误计数
errorCount int64
// 活跃连接数
activeConnections int64
// 累计响应时间
totalResponseTime int64
// 传输字节数
bytesTransferred map[string]int64
// 后端健康状态
backendHealth map[string]bool
// 后端响应时间
backendResponseTime map[string]time.Duration
// 缓存命中计数
cacheHits int64
// 互斥锁
mu sync.Mutex
}
// NewSimpleMetrics 创建简单指标
func NewSimpleMetrics() *SimpleMetrics {
return &SimpleMetrics{
bytesTransferred: make(map[string]int64),
backendHealth: make(map[string]bool),
backendResponseTime: make(map[string]time.Duration),
}
}
// IncRequestCount 增加请求计数
func (m *SimpleMetrics) IncRequestCount() {
atomic.AddInt64(&m.requestCount, 1)
}
// IncErrorCount 增加错误计数
func (m *SimpleMetrics) IncErrorCount(err error) {
atomic.AddInt64(&m.errorCount, 1)
}
// ObserveRequestDuration 观察请求持续时间
func (m *SimpleMetrics) ObserveRequestDuration(seconds float64) {
nsec := int64(seconds * float64(time.Second))
atomic.AddInt64(&m.totalResponseTime, nsec)
}
// IncActiveConnections 增加活跃连接数
func (m *SimpleMetrics) IncActiveConnections() {
atomic.AddInt64(&m.activeConnections, 1)
}
// DecActiveConnections 减少活跃连接数
func (m *SimpleMetrics) DecActiveConnections() {
atomic.AddInt64(&m.activeConnections, -1)
}
// SetBackendHealth 设置后端健康状态
func (m *SimpleMetrics) SetBackendHealth(backend string, healthy bool) {
m.backendHealth[backend] = healthy
}
// SetBackendResponseTime 设置后端响应时间
func (m *SimpleMetrics) SetBackendResponseTime(backend string, duration time.Duration) {
m.backendResponseTime[backend] = duration
}
// ObserveRequestBytes 观察请求字节数
func (m *SimpleMetrics) ObserveRequestBytes(bytes int64) {
m.mu.Lock()
defer m.mu.Unlock()
m.bytesTransferred["request"] += bytes
}
// ObserveResponseBytes 观察响应字节数
func (m *SimpleMetrics) ObserveResponseBytes(bytes int64) {
m.mu.Lock()
defer m.mu.Unlock()
m.bytesTransferred["response"] += bytes
}
// AddBytesTransferred 添加传输字节数
func (m *SimpleMetrics) AddBytesTransferred(direction string, bytes int64) {
m.mu.Lock()
defer m.mu.Unlock()
m.bytesTransferred[direction] += bytes
}
// IncCacheHit 增加缓存命中计数
func (m *SimpleMetrics) IncCacheHit() {
atomic.AddInt64(&m.cacheHits, 1)
}
// GetHandler 获取指标处理器
func (m *SimpleMetrics) GetHandler() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain")
// 输出基本指标
w.Write([]byte("# HELP proxy_requests_total 代理请求总数\n"))
w.Write([]byte("# TYPE proxy_requests_total counter\n"))
w.Write([]byte(fmt.Sprintf("proxy_requests_total %d\n", m.requestCount)))
w.Write([]byte("# HELP proxy_errors_total 代理错误总数\n"))
w.Write([]byte("# TYPE proxy_errors_total counter\n"))
w.Write([]byte(fmt.Sprintf("proxy_errors_total %d\n", m.errorCount)))
w.Write([]byte("# HELP proxy_active_connections 当前活跃连接数\n"))
w.Write([]byte("# TYPE proxy_active_connections gauge\n"))
w.Write([]byte(fmt.Sprintf("proxy_active_connections %d\n", m.activeConnections)))
// 输出缓存命中数据
w.Write([]byte("# HELP proxy_cache_hits_total 缓存命中总数\n"))
w.Write([]byte("# TYPE proxy_cache_hits_total counter\n"))
w.Write([]byte(fmt.Sprintf("proxy_cache_hits_total %d\n", m.cacheHits)))
// 输出传输字节数
for direction, bytes := range m.bytesTransferred {
w.Write([]byte(fmt.Sprintf("# HELP proxy_bytes_transferred_%s 代理传输字节数(%s)\n", direction, direction)))
w.Write([]byte(fmt.Sprintf("# TYPE proxy_bytes_transferred_%s counter\n", direction)))
w.Write([]byte(fmt.Sprintf("proxy_bytes_transferred_%s %d\n", direction, bytes)))
}
// 输出后端健康状态
for backend, healthy := range m.backendHealth {
healthValue := 0
if healthy {
healthValue = 1
}
w.Write([]byte(fmt.Sprintf("# HELP proxy_backend_health 后端健康状态\n")))
w.Write([]byte(fmt.Sprintf("# TYPE proxy_backend_health gauge\n")))
w.Write([]byte(fmt.Sprintf("proxy_backend_health{backend=\"%s\"} %d\n", backend, healthValue)))
}
// 输出后端响应时间
for backend, duration := range m.backendResponseTime {
w.Write([]byte(fmt.Sprintf("# HELP proxy_backend_response_time 后端响应时间\n")))
w.Write([]byte(fmt.Sprintf("# TYPE proxy_backend_response_time gauge\n")))
w.Write([]byte(fmt.Sprintf("proxy_backend_response_time{backend=\"%s\"} %f\n", backend, float64(duration)/float64(time.Second))))
}
// 平均响应时间
if m.requestCount > 0 {
avgTime := float64(m.totalResponseTime) / float64(m.requestCount) / float64(time.Second)
w.Write([]byte("# HELP proxy_average_response_time 平均响应时间\n"))
w.Write([]byte("# TYPE proxy_average_response_time gauge\n"))
w.Write([]byte(fmt.Sprintf("proxy_average_response_time %f\n", avgTime)))
}
})
}
// MetricsMiddleware 指标中间件
type MetricsMiddleware struct {
metrics MetricsCollector
}
// NewMetricsMiddleware 创建指标中间件
func NewMetricsMiddleware(metrics MetricsCollector) *MetricsMiddleware {
return &MetricsMiddleware{
metrics: metrics,
}
}
// Middleware 中间件处理函数
func (m *MetricsMiddleware) Middleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
// 包装响应写入器,用于捕获状态码
rw := &responseWriter{ResponseWriter: w, statusCode: http.StatusOK}
// 继续处理请求
next.ServeHTTP(rw, r)
// 记录请求指标
duration := time.Since(start)
m.metrics.ObserveRequestDuration(duration.Seconds())
})
}
// responseWriter 包装的响应写入器
type responseWriter struct {
http.ResponseWriter
statusCode int
written int64
}
// WriteHeader 写入状态码
func (rw *responseWriter) WriteHeader(statusCode int) {
rw.statusCode = statusCode
rw.ResponseWriter.WriteHeader(statusCode)
}
// Write 写入数据
func (rw *responseWriter) Write(b []byte) (int, error) {
n, err := rw.ResponseWriter.Write(b)
rw.written += int64(n)
return n, err
}
// Flush 刷新数据
func (rw *responseWriter) Flush() {
if flusher, ok := rw.ResponseWriter.(http.Flusher); ok {
flusher.Flush()
}
}