Files
mq/monitoring.go
2025-09-18 18:26:35 +05:45

851 lines
22 KiB
Go

package mq
import (
"context"
"encoding/json"
"fmt"
"net/http"
"runtime"
"sort"
"sync"
"sync/atomic"
"time"
"github.com/oarkflow/mq/logger"
)
// MetricsServer provides comprehensive monitoring and metrics
type MetricsServer struct {
broker *Broker
config *MonitoringConfig
logger logger.Logger
server *http.Server
registry *DetailedMetricsRegistry
healthChecker *SystemHealthChecker
alertManager *AlertManager
isRunning int32
shutdown chan struct{}
wg sync.WaitGroup
}
// DetailedMetricsRegistry stores and manages metrics with enhanced features
type DetailedMetricsRegistry struct {
metrics map[string]*TimeSeries
mu sync.RWMutex
}
// TimeSeries represents a time series metric
type TimeSeries struct {
Name string `json:"name"`
Type MetricType `json:"type"`
Description string `json:"description"`
Labels map[string]string `json:"labels"`
Values []TimeSeriesPoint `json:"values"`
MaxPoints int `json:"max_points"`
mu sync.RWMutex
}
// TimeSeriesPoint represents a single point in a time series
type TimeSeriesPoint struct {
Timestamp time.Time `json:"timestamp"`
Value float64 `json:"value"`
}
// MetricType represents the type of metric
type MetricType string
const (
MetricTypeCounter MetricType = "counter"
MetricTypeGauge MetricType = "gauge"
MetricTypeHistogram MetricType = "histogram"
MetricTypeSummary MetricType = "summary"
)
// SystemHealthChecker monitors system health
type SystemHealthChecker struct {
checks map[string]HealthCheck
results map[string]*HealthCheckResult
mu sync.RWMutex
logger logger.Logger
}
// HealthCheck interface for health checks
type HealthCheck interface {
Name() string
Check(ctx context.Context) *HealthCheckResult
Timeout() time.Duration
}
// HealthCheckResult represents the result of a health check
type HealthCheckResult struct {
Name string `json:"name"`
Status HealthStatus `json:"status"`
Message string `json:"message"`
Duration time.Duration `json:"duration"`
Timestamp time.Time `json:"timestamp"`
Metadata map[string]any `json:"metadata,omitempty"`
}
// HealthStatus represents the health status
type HealthStatus string
const (
HealthStatusHealthy HealthStatus = "healthy"
HealthStatusUnhealthy HealthStatus = "unhealthy"
HealthStatusWarning HealthStatus = "warning"
HealthStatusUnknown HealthStatus = "unknown"
)
// AlertManager manages alerts and notifications
type AlertManager struct {
rules []AlertRule
alerts []ActiveAlert
notifiers []AlertNotifier
mu sync.RWMutex
logger logger.Logger
}
// AlertRule defines conditions for triggering alerts
type AlertRule struct {
Name string `json:"name"`
Metric string `json:"metric"`
Condition string `json:"condition"` // "gt", "lt", "eq", "gte", "lte"
Threshold float64 `json:"threshold"`
Duration time.Duration `json:"duration"`
Labels map[string]string `json:"labels"`
Annotations map[string]string `json:"annotations"`
Enabled bool `json:"enabled"`
}
// ActiveAlert represents an active alert
type ActiveAlert struct {
Rule AlertRule `json:"rule"`
Value float64 `json:"value"`
StartsAt time.Time `json:"starts_at"`
EndsAt *time.Time `json:"ends_at,omitempty"`
Labels map[string]string `json:"labels"`
Annotations map[string]string `json:"annotations"`
Status AlertStatus `json:"status"`
}
// AlertStatus represents the status of an alert
type AlertStatus string
const (
AlertStatusFiring AlertStatus = "firing"
AlertStatusResolved AlertStatus = "resolved"
AlertStatusSilenced AlertStatus = "silenced"
)
// AlertNotifier interface for alert notifications
type AlertNotifier interface {
Notify(ctx context.Context, alert ActiveAlert) error
Name() string
}
// NewMetricsServer creates a new metrics server
func NewMetricsServer(broker *Broker, config *MonitoringConfig, logger logger.Logger) *MetricsServer {
return &MetricsServer{
broker: broker,
config: config,
logger: logger,
registry: NewDetailedMetricsRegistry(),
healthChecker: NewSystemHealthChecker(logger),
alertManager: NewAlertManager(logger),
shutdown: make(chan struct{}),
}
}
// NewMetricsRegistry creates a new metrics registry
func NewDetailedMetricsRegistry() *DetailedMetricsRegistry {
return &DetailedMetricsRegistry{
metrics: make(map[string]*TimeSeries),
}
}
// RegisterMetric registers a new metric
func (mr *DetailedMetricsRegistry) RegisterMetric(name string, metricType MetricType, description string, labels map[string]string) {
mr.mu.Lock()
defer mr.mu.Unlock()
mr.metrics[name] = &TimeSeries{
Name: name,
Type: metricType,
Description: description,
Labels: labels,
Values: make([]TimeSeriesPoint, 0),
MaxPoints: 1000, // Keep last 1000 points
}
}
// RecordValue records a value for a metric
func (mr *DetailedMetricsRegistry) RecordValue(name string, value float64) {
mr.mu.RLock()
metric, exists := mr.metrics[name]
mr.mu.RUnlock()
if !exists {
return
}
metric.mu.Lock()
defer metric.mu.Unlock()
point := TimeSeriesPoint{
Timestamp: time.Now(),
Value: value,
}
metric.Values = append(metric.Values, point)
// Keep only the last MaxPoints
if len(metric.Values) > metric.MaxPoints {
metric.Values = metric.Values[len(metric.Values)-metric.MaxPoints:]
}
}
// GetMetric returns a metric by name
func (mr *DetailedMetricsRegistry) GetMetric(name string) (*TimeSeries, bool) {
mr.mu.RLock()
defer mr.mu.RUnlock()
metric, exists := mr.metrics[name]
if !exists {
return nil, false
}
// Return a copy to prevent external modification
metric.mu.RLock()
defer metric.mu.RUnlock()
metricCopy := &TimeSeries{
Name: metric.Name,
Type: metric.Type,
Description: metric.Description,
Labels: make(map[string]string),
Values: make([]TimeSeriesPoint, len(metric.Values)),
MaxPoints: metric.MaxPoints,
}
for k, v := range metric.Labels {
metricCopy.Labels[k] = v
}
copy(metricCopy.Values, metric.Values)
return metricCopy, true
}
// GetAllMetrics returns all metrics
func (mr *DetailedMetricsRegistry) GetAllMetrics() map[string]*TimeSeries {
mr.mu.RLock()
defer mr.mu.RUnlock()
result := make(map[string]*TimeSeries)
for name := range mr.metrics {
result[name], _ = mr.GetMetric(name)
}
return result
}
// NewSystemHealthChecker creates a new system health checker
func NewSystemHealthChecker(logger logger.Logger) *SystemHealthChecker {
checker := &SystemHealthChecker{
checks: make(map[string]HealthCheck),
results: make(map[string]*HealthCheckResult),
logger: logger,
}
// Register default health checks
checker.RegisterCheck(&MemoryHealthCheck{})
checker.RegisterCheck(&GoRoutineHealthCheck{})
checker.RegisterCheck(&DiskSpaceHealthCheck{})
return checker
}
// RegisterCheck registers a health check
func (shc *SystemHealthChecker) RegisterCheck(check HealthCheck) {
shc.mu.Lock()
defer shc.mu.Unlock()
shc.checks[check.Name()] = check
}
// RunChecks runs all health checks
func (shc *SystemHealthChecker) RunChecks(ctx context.Context) map[string]*HealthCheckResult {
shc.mu.RLock()
checks := make(map[string]HealthCheck)
for name, check := range shc.checks {
checks[name] = check
}
shc.mu.RUnlock()
results := make(map[string]*HealthCheckResult)
var wg sync.WaitGroup
for name, check := range checks {
wg.Add(1)
go func(name string, check HealthCheck) {
defer wg.Done()
checkCtx, cancel := context.WithTimeout(ctx, check.Timeout())
defer cancel()
result := check.Check(checkCtx)
results[name] = result
shc.mu.Lock()
shc.results[name] = result
shc.mu.Unlock()
}(name, check)
}
wg.Wait()
return results
}
// GetOverallHealth returns the overall system health
func (shc *SystemHealthChecker) GetOverallHealth() HealthStatus {
shc.mu.RLock()
defer shc.mu.RUnlock()
if len(shc.results) == 0 {
return HealthStatusUnknown
}
hasUnhealthy := false
hasWarning := false
for _, result := range shc.results {
switch result.Status {
case HealthStatusUnhealthy:
hasUnhealthy = true
case HealthStatusWarning:
hasWarning = true
}
}
if hasUnhealthy {
return HealthStatusUnhealthy
}
if hasWarning {
return HealthStatusWarning
}
return HealthStatusHealthy
}
// MemoryHealthCheck checks memory usage
type MemoryHealthCheck struct{}
func (mhc *MemoryHealthCheck) Name() string {
return "memory"
}
func (mhc *MemoryHealthCheck) Timeout() time.Duration {
return 5 * time.Second
}
func (mhc *MemoryHealthCheck) Check(ctx context.Context) *HealthCheckResult {
var m runtime.MemStats
runtime.ReadMemStats(&m)
// Convert to MB
allocMB := float64(m.Alloc) / 1024 / 1024
sysMB := float64(m.Sys) / 1024 / 1024
status := HealthStatusHealthy
message := fmt.Sprintf("Memory usage: %.2f MB allocated, %.2f MB system", allocMB, sysMB)
// Simple thresholds (should be configurable)
if allocMB > 1000 { // 1GB
status = HealthStatusWarning
message += " (high memory usage)"
}
if allocMB > 2000 { // 2GB
status = HealthStatusUnhealthy
message += " (critical memory usage)"
}
return &HealthCheckResult{
Name: mhc.Name(),
Status: status,
Message: message,
Timestamp: time.Now(),
Metadata: map[string]any{
"alloc_mb": allocMB,
"sys_mb": sysMB,
"gc_cycles": m.NumGC,
"goroutines": runtime.NumGoroutine(),
},
}
}
// GoRoutineHealthCheck checks goroutine count
type GoRoutineHealthCheck struct{}
func (ghc *GoRoutineHealthCheck) Name() string {
return "goroutines"
}
func (ghc *GoRoutineHealthCheck) Timeout() time.Duration {
return 5 * time.Second
}
func (ghc *GoRoutineHealthCheck) Check(ctx context.Context) *HealthCheckResult {
count := runtime.NumGoroutine()
status := HealthStatusHealthy
message := fmt.Sprintf("Goroutines: %d", count)
// Simple thresholds
if count > 1000 {
status = HealthStatusWarning
message += " (high goroutine count)"
}
if count > 5000 {
status = HealthStatusUnhealthy
message += " (critical goroutine count)"
}
return &HealthCheckResult{
Name: ghc.Name(),
Status: status,
Message: message,
Timestamp: time.Now(),
Metadata: map[string]any{
"count": count,
},
}
}
// DiskSpaceHealthCheck checks available disk space
type DiskSpaceHealthCheck struct{}
func (dshc *DiskSpaceHealthCheck) Name() string {
return "disk_space"
}
func (dshc *DiskSpaceHealthCheck) Timeout() time.Duration {
return 5 * time.Second
}
func (dshc *DiskSpaceHealthCheck) Check(ctx context.Context) *HealthCheckResult {
// This is a simplified implementation
// In production, you would check actual disk space
return &HealthCheckResult{
Name: dshc.Name(),
Status: HealthStatusHealthy,
Message: "Disk space OK",
Timestamp: time.Now(),
Metadata: map[string]any{
"available_gb": 100.0, // Placeholder
},
}
}
// NewAlertManager creates a new alert manager
func NewAlertManager(logger logger.Logger) *AlertManager {
return &AlertManager{
rules: make([]AlertRule, 0),
alerts: make([]ActiveAlert, 0),
notifiers: make([]AlertNotifier, 0),
logger: logger,
}
}
// AddRule adds an alert rule
func (am *AlertManager) AddRule(rule AlertRule) {
am.mu.Lock()
defer am.mu.Unlock()
am.rules = append(am.rules, rule)
}
// AddNotifier adds an alert notifier
func (am *AlertManager) AddNotifier(notifier AlertNotifier) {
am.mu.Lock()
defer am.mu.Unlock()
am.notifiers = append(am.notifiers, notifier)
}
// EvaluateRules evaluates all alert rules against current metrics
func (am *AlertManager) EvaluateRules(registry *DetailedMetricsRegistry) {
am.mu.Lock()
defer am.mu.Unlock()
now := time.Now()
for _, rule := range am.rules {
if !rule.Enabled {
continue
}
metric, exists := registry.GetMetric(rule.Metric)
if !exists {
continue
}
if len(metric.Values) == 0 {
continue
}
// Get the latest value
latestValue := metric.Values[len(metric.Values)-1].Value
// Check if condition is met
conditionMet := false
switch rule.Condition {
case "gt":
conditionMet = latestValue > rule.Threshold
case "gte":
conditionMet = latestValue >= rule.Threshold
case "lt":
conditionMet = latestValue < rule.Threshold
case "lte":
conditionMet = latestValue <= rule.Threshold
case "eq":
conditionMet = latestValue == rule.Threshold
}
// Find existing alert
var existingAlert *ActiveAlert
for i := range am.alerts {
if am.alerts[i].Rule.Name == rule.Name && am.alerts[i].Status == AlertStatusFiring {
existingAlert = &am.alerts[i]
break
}
}
if conditionMet {
if existingAlert == nil {
// Create new alert
alert := ActiveAlert{
Rule: rule,
Value: latestValue,
StartsAt: now,
Labels: rule.Labels,
Annotations: rule.Annotations,
Status: AlertStatusFiring,
}
am.alerts = append(am.alerts, alert)
// Notify
for _, notifier := range am.notifiers {
go func(n AlertNotifier, a ActiveAlert) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := n.Notify(ctx, a); err != nil {
am.logger.Error("Failed to send alert notification",
logger.Field{Key: "notifier", Value: n.Name()},
logger.Field{Key: "alert", Value: a.Rule.Name},
logger.Field{Key: "error", Value: err.Error()})
}
}(notifier, alert)
}
} else {
// Update existing alert
existingAlert.Value = latestValue
}
} else if existingAlert != nil {
// Resolve alert
endTime := now
existingAlert.EndsAt = &endTime
existingAlert.Status = AlertStatusResolved
// Notify resolution
for _, notifier := range am.notifiers {
go func(n AlertNotifier, a ActiveAlert) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := n.Notify(ctx, a); err != nil {
am.logger.Error("Failed to send alert resolution notification",
logger.Field{Key: "notifier", Value: n.Name()},
logger.Field{Key: "alert", Value: a.Rule.Name},
logger.Field{Key: "error", Value: err.Error()})
}
}(notifier, *existingAlert)
}
}
}
}
// AddAlertRule adds an alert rule to the metrics server
func (ms *MetricsServer) AddAlertRule(rule AlertRule) {
ms.alertManager.AddRule(rule)
}
// AddAlertNotifier adds an alert notifier to the metrics server
func (ms *MetricsServer) AddAlertNotifier(notifier AlertNotifier) {
ms.alertManager.AddNotifier(notifier)
}
// Start starts the metrics server
func (ms *MetricsServer) Start(ctx context.Context) error {
if !atomic.CompareAndSwapInt32(&ms.isRunning, 0, 1) {
return fmt.Errorf("metrics server is already running")
}
// Register default metrics
ms.registerDefaultMetrics()
// Setup HTTP server
mux := http.NewServeMux()
mux.HandleFunc("/metrics", ms.handleMetrics)
mux.HandleFunc("/health", ms.handleHealth)
mux.HandleFunc("/alerts", ms.handleAlerts)
ms.server = &http.Server{
Addr: fmt.Sprintf(":%d", ms.config.MetricsPort),
Handler: mux,
}
// Start collection routines
ms.wg.Add(1)
go ms.metricsCollectionLoop(ctx)
ms.wg.Add(1)
go ms.healthCheckLoop(ctx)
ms.wg.Add(1)
go ms.alertEvaluationLoop(ctx)
// Start HTTP server
go func() {
ms.logger.Info("Metrics server starting",
logger.Field{Key: "port", Value: ms.config.MetricsPort})
if err := ms.server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
ms.logger.Error("Metrics server error",
logger.Field{Key: "error", Value: err.Error()})
}
}()
return nil
}
// Stop stops the metrics server
func (ms *MetricsServer) Stop() error {
if !atomic.CompareAndSwapInt32(&ms.isRunning, 1, 0) {
return nil
}
close(ms.shutdown)
// Stop HTTP server
if ms.server != nil {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
ms.server.Shutdown(ctx)
}
// Wait for goroutines to finish
ms.wg.Wait()
ms.logger.Info("Metrics server stopped")
return nil
}
// registerDefaultMetrics registers default system metrics
func (ms *MetricsServer) registerDefaultMetrics() {
ms.registry.RegisterMetric("mq_broker_connections_total", MetricTypeGauge, "Total number of broker connections", nil)
ms.registry.RegisterMetric("mq_messages_processed_total", MetricTypeCounter, "Total number of processed messages", nil)
ms.registry.RegisterMetric("mq_messages_failed_total", MetricTypeCounter, "Total number of failed messages", nil)
ms.registry.RegisterMetric("mq_queue_depth", MetricTypeGauge, "Current queue depth", nil)
ms.registry.RegisterMetric("mq_memory_usage_bytes", MetricTypeGauge, "Memory usage in bytes", nil)
ms.registry.RegisterMetric("mq_goroutines_total", MetricTypeGauge, "Total number of goroutines", nil)
ms.registry.RegisterMetric("mq_gc_duration_seconds", MetricTypeGauge, "GC duration in seconds", nil)
}
// metricsCollectionLoop collects metrics periodically
func (ms *MetricsServer) metricsCollectionLoop(ctx context.Context) {
defer ms.wg.Done()
ticker := time.NewTicker(1 * time.Minute) // Default to 1 minute if not configured
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ms.shutdown:
return
case <-ticker.C:
ms.collectSystemMetrics()
ms.collectBrokerMetrics()
}
}
}
// collectSystemMetrics collects system-level metrics
func (ms *MetricsServer) collectSystemMetrics() {
var m runtime.MemStats
runtime.ReadMemStats(&m)
ms.registry.RecordValue("mq_memory_usage_bytes", float64(m.Alloc))
ms.registry.RecordValue("mq_goroutines_total", float64(runtime.NumGoroutine()))
ms.registry.RecordValue("mq_gc_duration_seconds", float64(m.PauseTotalNs)/1e9)
}
// collectBrokerMetrics collects broker-specific metrics
func (ms *MetricsServer) collectBrokerMetrics() {
if ms.broker == nil {
return
}
// Collect connection metrics
activeConns := ms.broker.connectionPool.GetActiveConnections()
ms.registry.RecordValue("mq_broker_connections_total", float64(activeConns))
// Collect queue metrics
totalDepth := 0
ms.broker.queues.ForEach(func(name string, queue *Queue) bool {
depth := len(queue.tasks)
totalDepth += depth
// Record per-queue metrics with labels
queueMetric := fmt.Sprintf("mq_queue_depth{queue=\"%s\"}", name)
ms.registry.RegisterMetric(queueMetric, MetricTypeGauge, "Queue depth for specific queue", map[string]string{"queue": name})
ms.registry.RecordValue(queueMetric, float64(depth))
return true
})
ms.registry.RecordValue("mq_queue_depth", float64(totalDepth))
}
// healthCheckLoop runs health checks periodically
func (ms *MetricsServer) healthCheckLoop(ctx context.Context) {
defer ms.wg.Done()
ticker := time.NewTicker(ms.config.HealthCheckInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ms.shutdown:
return
case <-ticker.C:
ms.healthChecker.RunChecks(ctx)
}
}
}
// alertEvaluationLoop evaluates alerts periodically
func (ms *MetricsServer) alertEvaluationLoop(ctx context.Context) {
defer ms.wg.Done()
ticker := time.NewTicker(30 * time.Second) // Evaluate every 30 seconds
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ms.shutdown:
return
case <-ticker.C:
ms.alertManager.EvaluateRules(ms.registry)
}
}
}
// handleMetrics handles the /metrics endpoint
func (ms *MetricsServer) handleMetrics(w http.ResponseWriter, r *http.Request) {
metrics := ms.registry.GetAllMetrics()
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]any{
"timestamp": time.Now(),
"metrics": metrics,
})
}
// handleHealth handles the /health endpoint
func (ms *MetricsServer) handleHealth(w http.ResponseWriter, r *http.Request) {
results := ms.healthChecker.RunChecks(r.Context())
overallHealth := ms.healthChecker.GetOverallHealth()
response := map[string]any{
"status": overallHealth,
"timestamp": time.Now(),
"checks": results,
}
w.Header().Set("Content-Type", "application/json")
// Set HTTP status based on health
switch overallHealth {
case HealthStatusHealthy:
w.WriteHeader(http.StatusOK)
case HealthStatusWarning:
w.WriteHeader(http.StatusOK) // Still OK but with warnings
case HealthStatusUnhealthy:
w.WriteHeader(http.StatusServiceUnavailable)
default:
w.WriteHeader(http.StatusInternalServerError)
}
json.NewEncoder(w).Encode(response)
}
// handleAlerts handles the /alerts endpoint
func (ms *MetricsServer) handleAlerts(w http.ResponseWriter, r *http.Request) {
ms.alertManager.mu.RLock()
alerts := make([]ActiveAlert, len(ms.alertManager.alerts))
copy(alerts, ms.alertManager.alerts)
ms.alertManager.mu.RUnlock()
// Sort alerts by start time (newest first)
sort.Slice(alerts, func(i, j int) bool {
return alerts[i].StartsAt.After(alerts[j].StartsAt)
})
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]any{
"timestamp": time.Now(),
"alerts": alerts,
})
}
// LogNotifier sends alerts to logs
type LogNotifier struct {
logger logger.Logger
}
func NewLogNotifier(logger logger.Logger) *LogNotifier {
return &LogNotifier{logger: logger}
}
func (ln *LogNotifier) Name() string {
return "log"
}
func (ln *LogNotifier) Notify(ctx context.Context, alert ActiveAlert) error {
level := "info"
if alert.Status == AlertStatusFiring {
level = "error"
}
message := fmt.Sprintf("Alert %s: %s (value: %.2f, threshold: %.2f)",
alert.Status, alert.Rule.Name, alert.Value, alert.Rule.Threshold)
if level == "error" {
ln.logger.Error(message,
logger.Field{Key: "alert_name", Value: alert.Rule.Name},
logger.Field{Key: "alert_status", Value: string(alert.Status)},
logger.Field{Key: "value", Value: alert.Value},
logger.Field{Key: "threshold", Value: alert.Rule.Threshold})
} else {
ln.logger.Info(message,
logger.Field{Key: "alert_name", Value: alert.Rule.Name},
logger.Field{Key: "alert_status", Value: string(alert.Status)},
logger.Field{Key: "value", Value: alert.Value},
logger.Field{Key: "threshold", Value: alert.Rule.Threshold})
}
return nil
}