Files
mq/dag/monitoring.go
2025-09-18 18:26:35 +05:45

478 lines
12 KiB
Go

package dag
import (
"context"
"fmt"
"sync"
"time"
"github.com/oarkflow/mq"
"github.com/oarkflow/mq/logger"
)
// MonitoringMetrics holds comprehensive metrics for DAG monitoring
type MonitoringMetrics struct {
mu sync.RWMutex
TasksTotal int64
TasksCompleted int64
TasksFailed int64
TasksCancelled int64
TasksInProgress int64
NodesExecuted map[string]int64
NodeExecutionTimes map[string][]time.Duration
NodeFailures map[string]int64
AverageExecutionTime time.Duration
TotalExecutionTime time.Duration
StartTime time.Time
LastTaskCompletedAt time.Time
ActiveTasks map[string]time.Time
NodeProcessingStats map[string]*NodeStats
}
// NodeStats holds statistics for individual nodes
type NodeStats struct {
ExecutionCount int64
SuccessCount int64
FailureCount int64
TotalDuration time.Duration
AverageDuration time.Duration
MinDuration time.Duration
MaxDuration time.Duration
LastExecuted time.Time
LastSuccess time.Time
LastFailure time.Time
CurrentlyRunning int64
}
// NewMonitoringMetrics creates a new metrics instance
func NewMonitoringMetrics() *MonitoringMetrics {
return &MonitoringMetrics{
NodesExecuted: make(map[string]int64),
NodeExecutionTimes: make(map[string][]time.Duration),
NodeFailures: make(map[string]int64),
StartTime: time.Now(),
ActiveTasks: make(map[string]time.Time),
NodeProcessingStats: make(map[string]*NodeStats),
}
}
// RecordTaskStart records the start of a task
func (m *MonitoringMetrics) RecordTaskStart(taskID string) {
m.mu.Lock()
defer m.mu.Unlock()
m.TasksTotal++
m.TasksInProgress++
m.ActiveTasks[taskID] = time.Now()
}
// RecordTaskCompletion records task completion
func (m *MonitoringMetrics) RecordTaskCompletion(taskID string, status mq.Status) {
m.mu.Lock()
defer m.mu.Unlock()
m.TasksInProgress--
if m.TasksInProgress < 0 {
m.TasksInProgress = 0
}
switch status {
case mq.Completed:
m.TasksCompleted++
case mq.Failed:
m.TasksFailed++
case mq.Cancelled:
m.TasksCancelled++
}
m.LastTaskCompletedAt = time.Now()
delete(m.ActiveTasks, taskID)
}
// RecordNodeExecution records node execution metrics
func (m *MonitoringMetrics) RecordNodeExecution(nodeID string, duration time.Duration, success bool) {
m.mu.Lock()
defer m.mu.Unlock()
// Initialize node stats if not exists
if _, exists := m.NodeProcessingStats[nodeID]; !exists {
m.NodeProcessingStats[nodeID] = &NodeStats{
MinDuration: duration,
MaxDuration: duration,
}
}
stats := m.NodeProcessingStats[nodeID]
stats.ExecutionCount++
stats.TotalDuration += duration
stats.AverageDuration = stats.TotalDuration / time.Duration(stats.ExecutionCount)
stats.LastExecuted = time.Now()
if duration < stats.MinDuration || stats.MinDuration == 0 {
stats.MinDuration = duration
}
if duration > stats.MaxDuration {
stats.MaxDuration = duration
}
if success {
stats.SuccessCount++
stats.LastSuccess = time.Now()
} else {
stats.FailureCount++
stats.LastFailure = time.Now()
m.NodeFailures[nodeID]++
}
// Legacy tracking
m.NodesExecuted[nodeID]++
m.NodeExecutionTimes[nodeID] = append(m.NodeExecutionTimes[nodeID], duration)
// Keep only last 100 execution times per node to prevent memory bloat
if len(m.NodeExecutionTimes[nodeID]) > 100 {
m.NodeExecutionTimes[nodeID] = m.NodeExecutionTimes[nodeID][len(m.NodeExecutionTimes[nodeID])-100:]
}
// Calculate average execution time
var totalDuration time.Duration
var totalExecutions int64
for _, durations := range m.NodeExecutionTimes {
for _, d := range durations {
totalDuration += d
totalExecutions++
}
}
if totalExecutions > 0 {
m.AverageExecutionTime = totalDuration / time.Duration(totalExecutions)
}
m.TotalExecutionTime += duration
}
// RecordNodeStart records when a node starts processing
func (m *MonitoringMetrics) RecordNodeStart(nodeID string) {
m.mu.Lock()
defer m.mu.Unlock()
if stats, exists := m.NodeProcessingStats[nodeID]; exists {
stats.CurrentlyRunning++
} else {
m.NodeProcessingStats[nodeID] = &NodeStats{
CurrentlyRunning: 1,
}
}
}
// RecordNodeEnd records when a node finishes processing
func (m *MonitoringMetrics) RecordNodeEnd(nodeID string) {
m.mu.Lock()
defer m.mu.Unlock()
if stats, exists := m.NodeProcessingStats[nodeID]; exists {
stats.CurrentlyRunning--
if stats.CurrentlyRunning < 0 {
stats.CurrentlyRunning = 0
}
}
}
// GetSnapshot returns a snapshot of current metrics
func (m *MonitoringMetrics) GetSnapshot() *MonitoringMetrics {
m.mu.RLock()
defer m.mu.RUnlock()
snapshot := &MonitoringMetrics{
TasksTotal: m.TasksTotal,
TasksCompleted: m.TasksCompleted,
TasksFailed: m.TasksFailed,
TasksCancelled: m.TasksCancelled,
TasksInProgress: m.TasksInProgress,
AverageExecutionTime: m.AverageExecutionTime,
TotalExecutionTime: m.TotalExecutionTime,
StartTime: m.StartTime,
LastTaskCompletedAt: m.LastTaskCompletedAt,
NodesExecuted: make(map[string]int64),
NodeExecutionTimes: make(map[string][]time.Duration),
NodeFailures: make(map[string]int64),
ActiveTasks: make(map[string]time.Time),
NodeProcessingStats: make(map[string]*NodeStats),
}
// Deep copy maps
for k, v := range m.NodesExecuted {
snapshot.NodesExecuted[k] = v
}
for k, v := range m.NodeFailures {
snapshot.NodeFailures[k] = v
}
for k, v := range m.ActiveTasks {
snapshot.ActiveTasks[k] = v
}
for k, v := range m.NodeProcessingStats {
statsCopy := *v
snapshot.NodeProcessingStats[k] = &statsCopy
}
for k, v := range m.NodeExecutionTimes {
timesCopy := make([]time.Duration, len(v))
copy(timesCopy, v)
snapshot.NodeExecutionTimes[k] = timesCopy
}
return snapshot
}
// GetNodeStats returns statistics for a specific node
func (m *MonitoringMetrics) GetNodeStats(nodeID string) *NodeStats {
m.mu.RLock()
defer m.mu.RUnlock()
if stats, exists := m.NodeProcessingStats[nodeID]; exists {
statsCopy := *stats
return &statsCopy
}
return nil
}
// Monitor provides comprehensive monitoring capabilities for DAG
type Monitor struct {
dag *DAG
metrics *MonitoringMetrics
logger logger.Logger
thresholds *AlertThresholds
handlers []AlertHandler
stopCh chan struct{}
running bool
mu sync.RWMutex
}
// AlertThresholds defines thresholds for alerting
type AlertThresholds struct {
MaxFailureRate float64 `json:"max_failure_rate"`
MaxExecutionTime time.Duration `json:"max_execution_time"`
MaxTasksInProgress int64 `json:"max_tasks_in_progress"`
MinSuccessRate float64 `json:"min_success_rate"`
MaxNodeFailures int64 `json:"max_node_failures"`
HealthCheckInterval time.Duration `json:"health_check_interval"`
}
// AlertHandler defines interface for handling alerts
type AlertHandler interface {
HandleAlert(alert Alert) error
}
// Alert represents a monitoring alert
type Alert struct {
ID string `json:"id"`
Timestamp time.Time `json:"timestamp"`
Severity AlertSeverity `json:"severity"`
Type AlertType `json:"type"`
Message string `json:"message"`
Details map[string]any `json:"details"`
NodeID string `json:"node_id,omitempty"`
TaskID string `json:"task_id,omitempty"`
Threshold any `json:"threshold,omitempty"`
ActualValue any `json:"actual_value,omitempty"`
}
type AlertSeverity string
const (
AlertSeverityInfo AlertSeverity = "info"
AlertSeverityWarning AlertSeverity = "warning"
AlertSeverityCritical AlertSeverity = "critical"
)
type AlertType string
const (
AlertTypeFailureRate AlertType = "failure_rate"
AlertTypeExecutionTime AlertType = "execution_time"
AlertTypeTaskLoad AlertType = "task_load"
AlertTypeNodeFailures AlertType = "node_failures"
AlertTypeCircuitBreaker AlertType = "circuit_breaker"
AlertTypeHealthCheck AlertType = "health_check"
)
// NewMonitor creates a new DAG monitor
func NewMonitor(dag *DAG, logger logger.Logger) *Monitor {
return &Monitor{
dag: dag,
metrics: NewMonitoringMetrics(),
logger: logger,
thresholds: &AlertThresholds{
MaxFailureRate: 0.1, // 10%
MaxExecutionTime: 5 * time.Minute,
MaxTasksInProgress: 1000,
MinSuccessRate: 0.9, // 90%
MaxNodeFailures: 10,
HealthCheckInterval: 30 * time.Second,
},
handlers: make([]AlertHandler, 0),
stopCh: make(chan struct{}),
}
}
// Start begins monitoring
func (m *Monitor) Start(ctx context.Context) {
m.mu.Lock()
defer m.mu.Unlock()
if m.running {
return
}
m.running = true
go m.healthCheckRoutine(ctx)
m.logger.Info("DAG monitoring started")
}
// Stop stops monitoring
func (m *Monitor) Stop() {
m.mu.Lock()
defer m.mu.Unlock()
if !m.running {
return
}
m.running = false
close(m.stopCh)
m.logger.Info("DAG monitoring stopped")
}
// SetAlertThresholds updates alert thresholds
func (m *Monitor) SetAlertThresholds(thresholds *AlertThresholds) {
m.mu.Lock()
defer m.mu.Unlock()
m.thresholds = thresholds
}
// AddAlertHandler adds an alert handler
func (m *Monitor) AddAlertHandler(handler AlertHandler) {
m.mu.Lock()
defer m.mu.Unlock()
m.handlers = append(m.handlers, handler)
}
// GetMetrics returns current metrics
func (m *Monitor) GetMetrics() *MonitoringMetrics {
return m.metrics.GetSnapshot()
}
// healthCheckRoutine performs periodic health checks
func (m *Monitor) healthCheckRoutine(ctx context.Context) {
ticker := time.NewTicker(m.thresholds.HealthCheckInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-m.stopCh:
return
case <-ticker.C:
m.performHealthCheck()
}
}
}
// performHealthCheck checks system health and triggers alerts
func (m *Monitor) performHealthCheck() {
metrics := m.GetMetrics()
// Check failure rate
if metrics.TasksTotal > 0 {
failureRate := float64(metrics.TasksFailed) / float64(metrics.TasksTotal)
if failureRate > m.thresholds.MaxFailureRate {
m.triggerAlert(Alert{
ID: mq.NewID(),
Timestamp: time.Now(),
Severity: AlertSeverityCritical,
Type: AlertTypeFailureRate,
Message: "High failure rate detected",
Threshold: m.thresholds.MaxFailureRate,
ActualValue: failureRate,
Details: map[string]any{
"failed_tasks": metrics.TasksFailed,
"total_tasks": metrics.TasksTotal,
},
})
}
}
// Check task load
if metrics.TasksInProgress > m.thresholds.MaxTasksInProgress {
m.triggerAlert(Alert{
ID: mq.NewID(),
Timestamp: time.Now(),
Severity: AlertSeverityWarning,
Type: AlertTypeTaskLoad,
Message: "High task load detected",
Threshold: m.thresholds.MaxTasksInProgress,
ActualValue: metrics.TasksInProgress,
Details: map[string]any{
"tasks_in_progress": metrics.TasksInProgress,
},
})
}
// Check node failures
for nodeID, failures := range metrics.NodeFailures {
if failures > m.thresholds.MaxNodeFailures {
m.triggerAlert(Alert{
ID: mq.NewID(),
Timestamp: time.Now(),
Severity: AlertSeverityCritical,
Type: AlertTypeNodeFailures,
Message: fmt.Sprintf("Node %s has too many failures", nodeID),
NodeID: nodeID,
Threshold: m.thresholds.MaxNodeFailures,
ActualValue: failures,
Details: map[string]any{
"node_id": nodeID,
"failures": failures,
},
})
}
}
// Check execution time
if metrics.AverageExecutionTime > m.thresholds.MaxExecutionTime {
m.triggerAlert(Alert{
ID: mq.NewID(),
Timestamp: time.Now(),
Severity: AlertSeverityWarning,
Type: AlertTypeExecutionTime,
Message: "Average execution time is too high",
Threshold: m.thresholds.MaxExecutionTime,
ActualValue: metrics.AverageExecutionTime,
Details: map[string]any{
"average_execution_time": metrics.AverageExecutionTime.String(),
},
})
}
}
// triggerAlert sends alerts to all registered handlers
func (m *Monitor) triggerAlert(alert Alert) {
m.logger.Warn("Alert triggered",
logger.Field{Key: "alert_id", Value: alert.ID},
logger.Field{Key: "type", Value: string(alert.Type)},
logger.Field{Key: "severity", Value: string(alert.Severity)},
logger.Field{Key: "message", Value: alert.Message},
)
for _, handler := range m.handlers {
go func(h AlertHandler, a Alert) {
if err := h.HandleAlert(a); err != nil {
m.logger.Error("Alert handler error",
logger.Field{Key: "error", Value: err.Error()},
logger.Field{Key: "alert_id", Value: a.ID},
)
}
}(handler, alert)
}
}