feat: update

This commit is contained in:
sujit
2025-09-17 06:50:38 +05:45
parent 098360584a
commit 88d8fa02fd
8 changed files with 1412 additions and 10 deletions

View File

@@ -419,7 +419,13 @@ func (c *Codec) ReadMessage(ctx context.Context, conn net.Conn) (*Message, error
func (c *Codec) GetStats() Stats {
c.stats.mu.RLock()
defer c.stats.mu.RUnlock()
return *c.stats
return Stats{
MessagesSent: c.stats.MessagesSent,
MessagesReceived: c.stats.MessagesReceived,
BytesSent: c.stats.BytesSent,
BytesReceived: c.stats.BytesReceived,
Errors: c.stats.Errors,
}
}
// ResetStats resets codec statistics

View File

@@ -263,7 +263,7 @@ func (tm *DAG) SVGViewerHTML(svgContent string) string {
body {
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
margin: 0;
background: linear-gradient(135deg, #667eea 0%%, #764ba2 100%%);
background: linear-gradient(135deg, #667eea 0%%%%, #764ba2 100%%%%);
min-height: 100vh;
display: flex;
flex-direction: column;
@@ -339,8 +339,8 @@ func (tm *DAG) SVGViewerHTML(svgContent string) string {
}
.svg-container {
width: 100%%;
height: 100%%;
width: 100%%%%;
height: 100%%%%;
cursor: grab;
position: relative;
overflow: hidden;
@@ -357,8 +357,8 @@ func (tm *DAG) SVGViewerHTML(svgContent string) string {
user-select: none;
transform-origin: center center;
transition: transform 0.2s ease-out;
max-width: 100%%;
max-height: 100%%;
max-width: 100%%%%;
max-height: 100%%%%;
}
.svg-wrapper svg {
@@ -523,7 +523,7 @@ func (tm *DAG) SVGViewerHTML(svgContent string) string {
const scaleX = availableWidth / svgWidth;
const scaleY = availableHeight / svgHeight;
initialScale = Math.min(scaleX, scaleY, 1); // Don't scale up beyond 100%
initialScale = Math.min(scaleX, scaleY, 1); // Don't scale up beyond 100%%%%
// Reset position
currentX = 0;

View File

@@ -32,6 +32,7 @@ func (h *DataHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result
}
var result map[string]any
var conditionStatus string
switch operation {
case "sort":
result = h.sortData(data)
@@ -41,6 +42,11 @@ func (h *DataHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result
result = h.calculateFields(data)
case "conditional_set":
result = h.conditionalSet(data)
// Extract condition status from result
if status, ok := result["_condition_status"].(string); ok {
conditionStatus = status
delete(result, "_condition_status") // Remove from payload
}
case "type_cast":
result = h.typeCast(data)
case "validate_fields":
@@ -60,7 +66,11 @@ func (h *DataHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result
return mq.Result{Error: fmt.Errorf("failed to marshal result: %w", err)}
}
return mq.Result{Payload: resultPayload, Ctx: ctx}
return mq.Result{
Payload: resultPayload,
Ctx: ctx,
ConditionStatus: conditionStatus,
}
}
func (h *DataHandler) sortData(data map[string]any) map[string]any {
@@ -186,6 +196,9 @@ func (h *DataHandler) conditionalSet(data map[string]any) map[string]any {
result[key] = value
}
// Track which condition was met for setting ConditionStatus
var metCondition string
for targetField, condConfig := range conditions {
condition := condConfig["condition"].(string)
ifTrue := condConfig["if_true"]
@@ -193,11 +206,19 @@ func (h *DataHandler) conditionalSet(data map[string]any) map[string]any {
if h.evaluateCondition(data, condition) {
result[targetField] = ifTrue
if metCondition == "" { // Take the first met condition
metCondition = condition
}
} else {
result[targetField] = ifFalse
}
}
// Set condition status if any condition was evaluated
if metCondition != "" {
result["_condition_status"] = metCondition
}
return result
}

577
performance.go Normal file
View File

@@ -0,0 +1,577 @@
package mq
import (
"context"
"fmt"
"runtime"
"sync"
"sync/atomic"
"time"
)
// PerformanceOptimizer provides performance optimization features
type PerformanceOptimizer struct {
metricsCollector *MetricsCollector
workerPool *Pool
config *PerformanceConfig
isEnabled int32
shutdown chan struct{}
wg sync.WaitGroup
}
// PerformanceConfig holds performance optimization settings
type PerformanceConfig struct {
EnableGCOptimization bool `json:"enable_gc_optimization"`
EnableMemoryPooling bool `json:"enable_memory_pooling"`
EnableWorkerAutoscaling bool `json:"enable_worker_autoscaling"`
GCTargetPercentage int `json:"gc_target_percentage"`
MemoryPoolSize int `json:"memory_pool_size"`
WorkerScalingInterval time.Duration `json:"worker_scaling_interval"`
PerformanceCheckInterval time.Duration `json:"performance_check_interval"`
MaxWorkers int `json:"max_workers"`
MinWorkers int `json:"min_workers"`
TargetLatency time.Duration `json:"target_latency"`
MemoryThreshold int64 `json:"memory_threshold"`
}
// MemoryPool provides object pooling for memory optimization
type MemoryPool struct {
pool sync.Pool
size int
}
// PerformanceMetrics holds performance-related metrics
type PerformanceMetrics struct {
AvgTaskLatency time.Duration `json:"avg_task_latency"`
Throughput float64 `json:"throughput"`
MemoryUsage int64 `json:"memory_usage"`
GCCycles uint32 `json:"gc_cycles"`
HeapObjects uint64 `json:"heap_objects"`
WorkerUtilization float64 `json:"worker_utilization"`
QueueDepth int `json:"queue_depth"`
ErrorRate float64 `json:"error_rate"`
Timestamp time.Time `json:"timestamp"`
}
// NewPerformanceOptimizer creates a new performance optimizer
func NewPerformanceOptimizer(metricsCollector *MetricsCollector, workerPool *Pool) *PerformanceOptimizer {
config := &PerformanceConfig{
EnableGCOptimization: true,
EnableMemoryPooling: true,
EnableWorkerAutoscaling: true,
GCTargetPercentage: 80,
MemoryPoolSize: 1024,
WorkerScalingInterval: 30 * time.Second,
PerformanceCheckInterval: 10 * time.Second,
MaxWorkers: 100,
MinWorkers: 1,
TargetLatency: 100 * time.Millisecond,
MemoryThreshold: 512 * 1024 * 1024, // 512MB
}
return &PerformanceOptimizer{
metricsCollector: metricsCollector,
workerPool: workerPool,
config: config,
shutdown: make(chan struct{}),
}
}
// Start starts the performance optimizer
func (po *PerformanceOptimizer) Start(ctx context.Context) error {
if !atomic.CompareAndSwapInt32(&po.isEnabled, 0, 1) {
return nil // Already started
}
// Log starting
po.workerPool.logger.Info().Msg("Starting performance optimizer")
// Start performance monitoring
po.wg.Add(1)
go po.performanceMonitor(ctx)
// Start worker autoscaling if enabled
if po.config.EnableWorkerAutoscaling {
po.wg.Add(1)
go po.workerAutoscaler(ctx)
}
// Start GC optimization if enabled
if po.config.EnableGCOptimization {
po.wg.Add(1)
go po.gcOptimizer(ctx)
}
return nil
}
// Stop stops the performance optimizer
func (po *PerformanceOptimizer) Stop() error {
if !atomic.CompareAndSwapInt32(&po.isEnabled, 1, 0) {
return nil
}
// Log stopping
po.workerPool.logger.Info().Msg("Stopping performance optimizer")
close(po.shutdown)
po.wg.Wait()
return nil
}
// performanceMonitor continuously monitors system performance
func (po *PerformanceOptimizer) performanceMonitor(ctx context.Context) {
defer po.wg.Done()
ticker := time.NewTicker(po.config.PerformanceCheckInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-po.shutdown:
return
case <-ticker.C:
metrics := po.collectPerformanceMetrics()
// Record metrics
po.metricsCollector.RecordMetric("performance.avg_latency", float64(metrics.AvgTaskLatency.Nanoseconds()), nil)
po.metricsCollector.RecordMetric("performance.throughput", metrics.Throughput, nil)
po.metricsCollector.RecordMetric("performance.memory_usage", float64(metrics.MemoryUsage), nil)
po.metricsCollector.RecordMetric("performance.gc_cycles", float64(metrics.GCCycles), nil)
po.metricsCollector.RecordMetric("performance.worker_utilization", metrics.WorkerUtilization, nil)
// Log performance issues
po.checkPerformanceThresholds(metrics)
}
}
}
// collectPerformanceMetrics collects current performance metrics
func (po *PerformanceOptimizer) collectPerformanceMetrics() PerformanceMetrics {
var m runtime.MemStats
runtime.ReadMemStats(&m)
// Get task processing metrics from worker pool
poolMetrics := po.workerPool.Metrics()
// Calculate average latency
var avgLatency time.Duration
if poolMetrics.CompletedTasks > 0 {
avgLatency = time.Duration(poolMetrics.ExecutionTime/poolMetrics.CompletedTasks) * time.Millisecond
}
// Calculate throughput (tasks per second)
throughput := float64(poolMetrics.CompletedTasks) / time.Since(time.Now().Add(-time.Minute)).Seconds()
// Calculate error rate
var errorRate float64
if poolMetrics.TotalTasks > 0 {
errorRate = float64(poolMetrics.ErrorCount) / float64(poolMetrics.TotalTasks) * 100
}
// Calculate worker utilization (simplified)
workerCount := atomic.LoadInt32(&po.workerPool.numOfWorkers)
queueDepth := po.workerPool.GetQueueDepth()
var utilization float64
if workerCount > 0 {
utilization = float64(queueDepth) / float64(workerCount) * 100
if utilization > 100 {
utilization = 100
}
}
return PerformanceMetrics{
AvgTaskLatency: avgLatency,
Throughput: throughput,
MemoryUsage: int64(m.Alloc),
GCCycles: m.NumGC,
HeapObjects: m.HeapObjects,
WorkerUtilization: utilization,
QueueDepth: queueDepth,
ErrorRate: errorRate,
Timestamp: time.Now(),
}
}
// checkPerformanceThresholds checks if performance metrics exceed thresholds
func (po *PerformanceOptimizer) checkPerformanceThresholds(metrics PerformanceMetrics) {
issues := []string{}
// Check latency
if metrics.AvgTaskLatency > po.config.TargetLatency {
issues = append(issues, fmt.Sprintf("High latency: %v (target: %v)",
metrics.AvgTaskLatency, po.config.TargetLatency))
}
// Check memory usage
if metrics.MemoryUsage > po.config.MemoryThreshold {
issues = append(issues, fmt.Sprintf("High memory usage: %d bytes (threshold: %d)",
metrics.MemoryUsage, po.config.MemoryThreshold))
}
// Check worker utilization
if metrics.WorkerUtilization > 90 {
issues = append(issues, fmt.Sprintf("High worker utilization: %.2f%%", metrics.WorkerUtilization))
}
// Check error rate
if metrics.ErrorRate > 5 {
issues = append(issues, fmt.Sprintf("High error rate: %.2f%%", metrics.ErrorRate))
}
// Log issues
if len(issues) > 0 {
po.workerPool.logger.Warn().Msg(fmt.Sprintf("Performance issues detected: %v", issues))
}
}
// workerAutoscaler automatically scales workers based on load
func (po *PerformanceOptimizer) workerAutoscaler(ctx context.Context) {
defer po.wg.Done()
ticker := time.NewTicker(po.config.WorkerScalingInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-po.shutdown:
return
case <-ticker.C:
po.adjustWorkerCount()
}
}
}
// adjustWorkerCount adjusts the number of workers based on current load
func (po *PerformanceOptimizer) adjustWorkerCount() {
metrics := po.collectPerformanceMetrics()
currentWorkers := int(atomic.LoadInt32(&po.workerPool.numOfWorkers))
var targetWorkers int
// Scale based on latency
if metrics.AvgTaskLatency > po.config.TargetLatency*2 {
// High latency - add workers
targetWorkers = currentWorkers + 2
} else if metrics.AvgTaskLatency > po.config.TargetLatency {
// Moderate latency - add one worker
targetWorkers = currentWorkers + 1
} else if metrics.AvgTaskLatency < po.config.TargetLatency/2 && currentWorkers > po.config.MinWorkers {
// Low latency - reduce workers
targetWorkers = currentWorkers - 1
} else {
targetWorkers = currentWorkers
}
// Apply bounds
if targetWorkers < po.config.MinWorkers {
targetWorkers = po.config.MinWorkers
}
if targetWorkers > po.config.MaxWorkers {
targetWorkers = po.config.MaxWorkers
}
// Scale based on queue depth
queueDepth := metrics.QueueDepth
if queueDepth > currentWorkers*10 {
targetWorkers = min(targetWorkers+3, po.config.MaxWorkers)
} else if queueDepth > currentWorkers*5 {
targetWorkers = min(targetWorkers+1, po.config.MaxWorkers)
}
// Apply scaling
if targetWorkers != currentWorkers {
po.workerPool.logger.Info().Msg(fmt.Sprintf("Auto-scaling workers from %d to %d (queue: %d)",
currentWorkers, targetWorkers, queueDepth))
po.workerPool.AdjustWorkerCount(targetWorkers)
}
}
// gcOptimizer optimizes garbage collection
func (po *PerformanceOptimizer) gcOptimizer(ctx context.Context) {
defer po.wg.Done()
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-po.shutdown:
return
case <-ticker.C:
po.optimizeGC()
}
}
}
// optimizeGC performs garbage collection optimization
func (po *PerformanceOptimizer) optimizeGC() {
var m runtime.MemStats
runtime.ReadMemStats(&m)
// Calculate memory usage percentage
totalMemory := float64(m.Sys)
usedMemory := float64(m.Alloc)
usagePercent := (usedMemory / totalMemory) * 100
// Force GC if memory usage is high
if usagePercent > float64(po.config.GCTargetPercentage) {
po.workerPool.logger.Info().Msg(fmt.Sprintf("Forcing garbage collection, memory usage: %.2f%%", usagePercent))
runtime.GC()
}
}
// NewMemoryPool creates a new memory pool
func NewMemoryPool(size int) *MemoryPool {
return &MemoryPool{
size: size,
pool: sync.Pool{
New: func() interface{} {
return make([]byte, size)
},
},
}
}
// Get gets a buffer from the pool
func (mp *MemoryPool) Get() []byte {
return mp.pool.Get().([]byte)
}
// Put returns a buffer to the pool
func (mp *MemoryPool) Put(buf []byte) {
if cap(buf) == mp.size {
mp.pool.Put(buf[:0]) // Reset length but keep capacity
}
}
// PerformanceMonitor provides real-time performance monitoring
type PerformanceMonitor struct {
optimizer *PerformanceOptimizer
metrics chan PerformanceMetrics
stop chan struct{}
}
// NewPerformanceMonitor creates a new performance monitor
func NewPerformanceMonitor(optimizer *PerformanceOptimizer) *PerformanceMonitor {
return &PerformanceMonitor{
optimizer: optimizer,
metrics: make(chan PerformanceMetrics, 100),
stop: make(chan struct{}),
}
}
// Start starts the performance monitor
func (pm *PerformanceMonitor) Start() {
go func() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-pm.stop:
return
case <-ticker.C:
select {
case pm.metrics <- pm.optimizer.collectPerformanceMetrics():
default:
// Channel is full, skip this metric
}
}
}
}()
}
// Stop stops the performance monitor
func (pm *PerformanceMonitor) Stop() {
close(pm.stop)
}
// GetMetrics returns the latest performance metrics
func (pm *PerformanceMonitor) GetMetrics() (PerformanceMetrics, bool) {
select {
case metrics := <-pm.metrics:
return metrics, true
default:
return PerformanceMetrics{}, false
}
}
// GetMetricsChannel returns the metrics channel
func (pm *PerformanceMonitor) GetMetricsChannel() <-chan PerformanceMetrics {
return pm.metrics
}
// PerformanceAlert represents a performance alert
type PerformanceAlert struct {
Type string `json:"type"`
Severity string `json:"severity"`
Message string `json:"message"`
Metrics PerformanceMetrics `json:"metrics"`
Threshold interface{} `json:"threshold"`
Timestamp time.Time `json:"timestamp"`
Details map[string]interface{} `json:"details,omitempty"`
}
// PerformanceAlerter manages performance alerts
type PerformanceAlerter struct {
alerts []PerformanceAlert
maxAlerts int
mu sync.RWMutex
}
// NewPerformanceAlerter creates a new performance alerter
func NewPerformanceAlerter(maxAlerts int) *PerformanceAlerter {
return &PerformanceAlerter{
alerts: make([]PerformanceAlert, 0),
maxAlerts: maxAlerts,
}
}
// AddAlert adds a performance alert
func (pa *PerformanceAlerter) AddAlert(alert PerformanceAlert) {
pa.mu.Lock()
defer pa.mu.Unlock()
alert.Timestamp = time.Now()
pa.alerts = append(pa.alerts, alert)
// Keep only recent alerts
if len(pa.alerts) > pa.maxAlerts {
pa.alerts = pa.alerts[len(pa.alerts)-pa.maxAlerts:]
}
}
// GetAlerts returns recent performance alerts
func (pa *PerformanceAlerter) GetAlerts(severity string, limit int) []PerformanceAlert {
pa.mu.RLock()
defer pa.mu.RUnlock()
var filtered []PerformanceAlert
for _, alert := range pa.alerts {
if severity == "" || alert.Severity == severity {
filtered = append(filtered, alert)
}
}
// Return most recent alerts
if len(filtered) > limit && limit > 0 {
start := len(filtered) - limit
return filtered[start:]
}
return filtered
}
// ClearAlerts clears all alerts
func (pa *PerformanceAlerter) ClearAlerts() {
pa.mu.Lock()
defer pa.mu.Unlock()
pa.alerts = pa.alerts[:0]
}
// PerformanceDashboard provides a web-based performance dashboard
type PerformanceDashboard struct {
optimizer *PerformanceOptimizer
alerter *PerformanceAlerter
monitor *PerformanceMonitor
}
// NewPerformanceDashboard creates a new performance dashboard
func NewPerformanceDashboard(optimizer *PerformanceOptimizer, alerter *PerformanceAlerter, monitor *PerformanceMonitor) *PerformanceDashboard {
return &PerformanceDashboard{
optimizer: optimizer,
alerter: alerter,
monitor: monitor,
}
}
// GetDashboardData returns data for the performance dashboard
func (pd *PerformanceDashboard) GetDashboardData() map[string]interface{} {
metrics, hasMetrics := pd.monitor.GetMetrics()
alerts := pd.alerter.GetAlerts("", 10)
data := map[string]interface{}{
"current_metrics": metrics,
"has_metrics": hasMetrics,
"recent_alerts": alerts,
"config": pd.optimizer.config,
"timestamp": time.Now(),
}
return data
}
// OptimizeForHighLoad optimizes the system for high load scenarios
func (po *PerformanceOptimizer) OptimizeForHighLoad() {
po.workerPool.logger.Info().Msg("Optimizing for high load")
// Increase worker count
currentWorkers := int(atomic.LoadInt32(&po.workerPool.numOfWorkers))
targetWorkers := min(currentWorkers*2, po.config.MaxWorkers)
if targetWorkers > currentWorkers {
po.workerPool.logger.Info().Msg(fmt.Sprintf("Scaling up workers for high load, from %d to %d", currentWorkers, targetWorkers))
po.workerPool.AdjustWorkerCount(targetWorkers)
}
// Force garbage collection
runtime.GC()
// Adjust batch size for better throughput
po.workerPool.SetBatchSize(10)
}
// OptimizeForLowLoad optimizes the system for low load scenarios
func (po *PerformanceOptimizer) OptimizeForLowLoad() {
po.workerPool.logger.Info().Msg("Optimizing for low load")
// Reduce worker count
currentWorkers := int(atomic.LoadInt32(&po.workerPool.numOfWorkers))
targetWorkers := max(currentWorkers/2, po.config.MinWorkers)
if targetWorkers < currentWorkers {
po.workerPool.logger.Info().Msg(fmt.Sprintf("Scaling down workers for low load, from %d to %d", currentWorkers, targetWorkers))
po.workerPool.AdjustWorkerCount(targetWorkers)
}
// Reduce batch size
po.workerPool.SetBatchSize(1)
}
// GetOptimizationRecommendations returns optimization recommendations
func (po *PerformanceOptimizer) GetOptimizationRecommendations() []string {
metrics := po.collectPerformanceMetrics()
recommendations := []string{}
if metrics.AvgTaskLatency > po.config.TargetLatency {
recommendations = append(recommendations,
"Consider increasing worker count to reduce latency")
}
if metrics.MemoryUsage > po.config.MemoryThreshold {
recommendations = append(recommendations,
"Consider increasing memory limits or optimizing memory usage")
}
if metrics.WorkerUtilization > 90 {
recommendations = append(recommendations,
"High worker utilization detected, consider scaling up")
}
if metrics.ErrorRate > 5 {
recommendations = append(recommendations,
"High error rate detected, check for systemic issues")
}
if len(recommendations) == 0 {
recommendations = append(recommendations, "System performance is optimal")
}
return recommendations
}

118
pool.go
View File

@@ -22,6 +22,124 @@ type Callback func(ctx context.Context, result Result) error
// CompletionCallback is called when the pool completes a graceful shutdown.
type CompletionCallback func()
// Enhanced Retry Mechanisms with Exponential Backoff and Jitter
type RetryStrategy interface {
NextDelay(attempt int) time.Duration
ShouldRetry(attempt int, err error) bool
MaxAttempts() int
}
type ExponentialBackoffRetry struct {
InitialDelay time.Duration
MaxDelay time.Duration
Multiplier float64
Jitter float64
maxAttempts int
}
func NewExponentialBackoffRetry(initialDelay, maxDelay time.Duration, multiplier, jitter float64, maxAttempts int) *ExponentialBackoffRetry {
return &ExponentialBackoffRetry{
InitialDelay: initialDelay,
MaxDelay: maxDelay,
Multiplier: multiplier,
Jitter: jitter,
maxAttempts: maxAttempts,
}
}
func (e *ExponentialBackoffRetry) NextDelay(attempt int) time.Duration {
if attempt <= 0 {
return e.InitialDelay
}
delay := float64(e.InitialDelay) * float64(attempt) * e.Multiplier
if delay > float64(e.MaxDelay) {
delay = float64(e.MaxDelay)
}
// Add jitter
jitterRange := delay * e.Jitter
jitter := (rand.Float64() - 0.5) * 2 * jitterRange
delay += jitter
if delay < 0 {
delay = 0
}
return time.Duration(delay)
}
func (e *ExponentialBackoffRetry) ShouldRetry(attempt int, err error) bool {
if attempt >= e.maxAttempts {
return false
}
// Don't retry certain types of errors
if err != nil {
errStr := err.Error()
// Don't retry authentication errors, permission errors, etc.
if strings.Contains(errStr, "unauthorized") ||
strings.Contains(errStr, "forbidden") ||
strings.Contains(errStr, "authentication") {
return false
}
}
return true
}
func (e *ExponentialBackoffRetry) MaxAttempts() int {
return e.maxAttempts
}
// Circuit Breaker Integration with Retry
type RetryWithCircuitBreaker struct {
retryStrategy RetryStrategy
circuitBreaker *EnhancedCircuitBreaker
}
func NewRetryWithCircuitBreaker(retryStrategy RetryStrategy, circuitBreaker *EnhancedCircuitBreaker) *RetryWithCircuitBreaker {
return &RetryWithCircuitBreaker{
retryStrategy: retryStrategy,
circuitBreaker: circuitBreaker,
}
}
func (r *RetryWithCircuitBreaker) ExecuteWithRetry(operation func() error) error {
attempt := 0
for {
// Check circuit breaker
if r.circuitBreaker != nil {
err := r.circuitBreaker.Call(operation)
if err != nil {
if !r.retryStrategy.ShouldRetry(attempt, err) {
return err
}
} else {
return nil // Success
}
} else {
err := operation()
if err == nil {
return nil // Success
}
if !r.retryStrategy.ShouldRetry(attempt, err) {
return err
}
}
attempt++
if attempt >= r.retryStrategy.MaxAttempts() {
return fmt.Errorf("max retry attempts exceeded")
}
delay := r.retryStrategy.NextDelay(attempt)
Logger.Info().Msgf("Retrying operation in %v (attempt %d/%d)", delay, attempt, r.retryStrategy.MaxAttempts())
time.Sleep(delay)
}
}
// Metrics holds cumulative pool metrics.
type Metrics struct {
TotalTasks int64 // total number of tasks processed

View File

@@ -445,7 +445,7 @@ func generateClientSideValidation(fieldPath string, validation ValidationInfo, a
var reqField = document.querySelector('[name="%s"]');
if (depField && depField.value && (!reqField || !reqField.value)) {
return 'Field %s is required when %s has a value';
}`, depField, reqField, reqField, depField))
}`, depField, reqField, depField, reqField, reqField, depField))
}
}
}

680
security.go Normal file
View File

@@ -0,0 +1,680 @@
package mq
import (
"context"
"crypto/rand"
"crypto/sha256"
"encoding/hex"
"fmt"
"net"
"strings"
"sync"
"time"
)
// SecurityManager handles authentication, authorization, and security policies
type SecurityManager struct {
authProviders map[string]AuthProvider
roleManager *RoleManager
rateLimiter *SecurityRateLimiter
auditLogger *AuditLogger
sessionManager *SessionManager
encryptionKey []byte
mu sync.RWMutex
}
// AuthProvider interface for different authentication methods
type AuthProvider interface {
Name() string
Authenticate(ctx context.Context, credentials map[string]interface{}) (*User, error)
ValidateToken(token string) (*User, error)
}
// User represents an authenticated user
type User struct {
ID string `json:"id"`
Username string `json:"username"`
Roles []string `json:"roles"`
Permissions []string `json:"permissions"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
CreatedAt time.Time `json:"created_at"`
LastLoginAt *time.Time `json:"last_login_at,omitempty"`
}
// RoleManager manages user roles and permissions
type RoleManager struct {
roles map[string]*Role
permissions map[string]*Permission
mu sync.RWMutex
}
// Role represents a user role with associated permissions
type Role struct {
Name string `json:"name"`
Description string `json:"description"`
Permissions []string `json:"permissions"`
CreatedAt time.Time `json:"created_at"`
}
// Permission represents a specific permission
type Permission struct {
Name string `json:"name"`
Resource string `json:"resource"`
Action string `json:"action"`
Description string `json:"description"`
CreatedAt time.Time `json:"created_at"`
}
// SecurityRateLimiter implements rate limiting for security operations
type SecurityRateLimiter struct {
attempts map[string]*RateLimitEntry
maxAttempts int
window time.Duration
mu sync.RWMutex
}
// RateLimitEntry tracks rate limiting for a specific key
type RateLimitEntry struct {
Count int
WindowStart time.Time
}
// AuditLogger logs security-related events
type AuditLogger struct {
events []AuditEvent
maxEvents int
mu sync.RWMutex
}
// AuditEvent represents a security audit event
type AuditEvent struct {
ID string `json:"id"`
Timestamp time.Time `json:"timestamp"`
EventType string `json:"event_type"`
UserID string `json:"user_id,omitempty"`
Resource string `json:"resource"`
Action string `json:"action"`
IPAddress string `json:"ip_address,omitempty"`
UserAgent string `json:"user_agent,omitempty"`
Success bool `json:"success"`
Details map[string]interface{} `json:"details,omitempty"`
}
// SessionManager manages user sessions
type SessionManager struct {
sessions map[string]*Session
maxAge time.Duration
mu sync.RWMutex
}
// Session represents a user session
type Session struct {
ID string `json:"id"`
UserID string `json:"user_id"`
CreatedAt time.Time `json:"created_at"`
ExpiresAt time.Time `json:"expires_at"`
IPAddress string `json:"ip_address"`
UserAgent string `json:"user_agent"`
Data map[string]interface{} `json:"data,omitempty"`
}
// NewSecurityManager creates a new security manager
func NewSecurityManager() *SecurityManager {
key := make([]byte, 32)
rand.Read(key)
return &SecurityManager{
authProviders: make(map[string]AuthProvider),
roleManager: NewRoleManager(),
rateLimiter: NewSecurityRateLimiter(5, time.Minute*15), // 5 attempts per 15 minutes
auditLogger: NewAuditLogger(10000),
sessionManager: NewSessionManager(time.Hour * 24), // 24 hour sessions
encryptionKey: key,
}
}
// NewRoleManager creates a new role manager
func NewRoleManager() *RoleManager {
rm := &RoleManager{
roles: make(map[string]*Role),
permissions: make(map[string]*Permission),
}
// Initialize default permissions
rm.AddPermission(&Permission{
Name: "task.publish",
Resource: "task",
Action: "publish",
Description: "Publish tasks to queues",
CreatedAt: time.Now(),
})
rm.AddPermission(&Permission{
Name: "task.consume",
Resource: "task",
Action: "consume",
Description: "Consume tasks from queues",
CreatedAt: time.Now(),
})
rm.AddPermission(&Permission{
Name: "queue.manage",
Resource: "queue",
Action: "manage",
Description: "Manage queues",
CreatedAt: time.Now(),
})
rm.AddPermission(&Permission{
Name: "admin.system",
Resource: "system",
Action: "admin",
Description: "System administration",
CreatedAt: time.Now(),
})
// Initialize default roles
rm.AddRole(&Role{
Name: "publisher",
Description: "Can publish tasks",
Permissions: []string{"task.publish"},
CreatedAt: time.Now(),
})
rm.AddRole(&Role{
Name: "consumer",
Description: "Can consume tasks",
Permissions: []string{"task.consume"},
CreatedAt: time.Now(),
})
rm.AddRole(&Role{
Name: "admin",
Description: "Full system access",
Permissions: []string{"task.publish", "task.consume", "queue.manage", "admin.system"},
CreatedAt: time.Now(),
})
return rm
}
// AddPermission adds a permission to the role manager
func (rm *RoleManager) AddPermission(perm *Permission) {
rm.mu.Lock()
defer rm.mu.Unlock()
rm.permissions[perm.Name] = perm
}
// AddRole adds a role to the role manager
func (rm *RoleManager) AddRole(role *Role) {
rm.mu.Lock()
defer rm.mu.Unlock()
rm.roles[role.Name] = role
}
// HasPermission checks if a user has a specific permission
func (rm *RoleManager) HasPermission(user *User, permission string) bool {
rm.mu.RLock()
defer rm.mu.RUnlock()
for _, roleName := range user.Roles {
if role, exists := rm.roles[roleName]; exists {
for _, perm := range role.Permissions {
if perm == permission {
return true
}
}
}
}
return false
}
// GetUserPermissions returns all permissions for a user
func (rm *RoleManager) GetUserPermissions(user *User) []string {
rm.mu.RLock()
defer rm.mu.RUnlock()
permissions := make(map[string]bool)
for _, roleName := range user.Roles {
if role, exists := rm.roles[roleName]; exists {
for _, perm := range role.Permissions {
permissions[perm] = true
}
}
}
result := make([]string, 0, len(permissions))
for perm := range permissions {
result = append(result, perm)
}
return result
}
// NewSecurityRateLimiter creates a new security rate limiter
func NewSecurityRateLimiter(maxAttempts int, window time.Duration) *SecurityRateLimiter {
return &SecurityRateLimiter{
attempts: make(map[string]*RateLimitEntry),
maxAttempts: maxAttempts,
window: window,
}
}
// IsAllowed checks if an action is allowed based on rate limiting
func (rl *SecurityRateLimiter) IsAllowed(key string) bool {
rl.mu.Lock()
defer rl.mu.Unlock()
now := time.Now()
entry, exists := rl.attempts[key]
if !exists {
rl.attempts[key] = &RateLimitEntry{
Count: 1,
WindowStart: now,
}
return true
}
// Check if we're in a new window
if now.Sub(entry.WindowStart) >= rl.window {
entry.Count = 1
entry.WindowStart = now
return true
}
// Check if we've exceeded the limit
if entry.Count >= rl.maxAttempts {
return false
}
entry.Count++
return true
}
// Reset resets the rate limit for a key
func (rl *SecurityRateLimiter) Reset(key string) {
rl.mu.Lock()
defer rl.mu.Unlock()
delete(rl.attempts, key)
}
// NewAuditLogger creates a new audit logger
func NewAuditLogger(maxEvents int) *AuditLogger {
return &AuditLogger{
events: make([]AuditEvent, 0),
maxEvents: maxEvents,
}
}
// LogEvent logs a security event
func (al *AuditLogger) LogEvent(event AuditEvent) {
al.mu.Lock()
defer al.mu.Unlock()
event.ID = generateID()
event.Timestamp = time.Now()
al.events = append(al.events, event)
// Keep only the most recent events
if len(al.events) > al.maxEvents {
al.events = al.events[len(al.events)-al.maxEvents:]
}
}
// GetEvents returns audit events with optional filtering
func (al *AuditLogger) GetEvents(userID, eventType string, limit int) []AuditEvent {
al.mu.RLock()
defer al.mu.RUnlock()
var filtered []AuditEvent
for _, event := range al.events {
if (userID == "" || event.UserID == userID) &&
(eventType == "" || event.EventType == eventType) {
filtered = append(filtered, event)
}
}
// Return most recent events
if len(filtered) > limit && limit > 0 {
start := len(filtered) - limit
return filtered[start:]
}
return filtered
}
// NewSessionManager creates a new session manager
func NewSessionManager(maxAge time.Duration) *SessionManager {
sm := &SessionManager{
sessions: make(map[string]*Session),
maxAge: maxAge,
}
// Start cleanup routine
go sm.cleanupRoutine()
return sm
}
// CreateSession creates a new session for a user
func (sm *SessionManager) CreateSession(userID, ipAddress, userAgent string) *Session {
sm.mu.Lock()
defer sm.mu.Unlock()
session := &Session{
ID: generateID(),
UserID: userID,
CreatedAt: time.Now(),
ExpiresAt: time.Now().Add(sm.maxAge),
IPAddress: ipAddress,
UserAgent: userAgent,
Data: make(map[string]interface{}),
}
sm.sessions[session.ID] = session
return session
}
// GetSession retrieves a session by ID
func (sm *SessionManager) GetSession(sessionID string) (*Session, bool) {
sm.mu.RLock()
defer sm.mu.RUnlock()
session, exists := sm.sessions[sessionID]
if !exists {
return nil, false
}
// Check if session has expired
if time.Now().After(session.ExpiresAt) {
return nil, false
}
return session, true
}
// DeleteSession deletes a session
func (sm *SessionManager) DeleteSession(sessionID string) {
sm.mu.Lock()
defer sm.mu.Unlock()
delete(sm.sessions, sessionID)
}
// cleanupRoutine periodically cleans up expired sessions
func (sm *SessionManager) cleanupRoutine() {
ticker := time.NewTicker(time.Hour)
defer ticker.Stop()
for range ticker.C {
sm.mu.Lock()
now := time.Now()
for id, session := range sm.sessions {
if now.After(session.ExpiresAt) {
delete(sm.sessions, id)
}
}
sm.mu.Unlock()
}
}
// AddAuthProvider adds an authentication provider
func (sm *SecurityManager) AddAuthProvider(provider AuthProvider) {
sm.mu.Lock()
defer sm.mu.Unlock()
sm.authProviders[provider.Name()] = provider
}
// Authenticate authenticates a user using available providers
func (sm *SecurityManager) Authenticate(ctx context.Context, credentials map[string]interface{}) (*User, error) {
sm.mu.RLock()
providers := make(map[string]AuthProvider)
for name, provider := range sm.authProviders {
providers[name] = provider
}
sm.mu.RUnlock()
var lastErr error
for _, provider := range providers {
user, err := provider.Authenticate(ctx, credentials)
if err == nil {
// Log successful authentication
sm.auditLogger.LogEvent(AuditEvent{
EventType: "authentication",
UserID: user.ID,
Action: "login",
Success: true,
Details: map[string]interface{}{
"provider": provider.Name(),
},
})
// Update user permissions
user.Permissions = sm.roleManager.GetUserPermissions(user)
return user, nil
}
lastErr = err
}
// Log failed authentication
sm.auditLogger.LogEvent(AuditEvent{
EventType: "authentication",
Action: "login",
Success: false,
Details: map[string]interface{}{
"error": lastErr.Error(),
},
})
return nil, fmt.Errorf("authentication failed: %w", lastErr)
}
// Authorize checks if a user is authorized for an action
func (sm *SecurityManager) Authorize(user *User, resource, action string) error {
permission := fmt.Sprintf("%s.%s", resource, action)
if !sm.roleManager.HasPermission(user, permission) {
// Log authorization failure
sm.auditLogger.LogEvent(AuditEvent{
EventType: "authorization",
UserID: user.ID,
Resource: resource,
Action: action,
Success: false,
})
return fmt.Errorf("user %s does not have permission %s", user.Username, permission)
}
// Log successful authorization
sm.auditLogger.LogEvent(AuditEvent{
EventType: "authorization",
UserID: user.ID,
Resource: resource,
Action: action,
Success: true,
})
return nil
}
// ValidateSession validates a session token
func (sm *SecurityManager) ValidateSession(sessionID string) (*User, error) {
session, exists := sm.sessionManager.GetSession(sessionID)
if !exists {
return nil, fmt.Errorf("invalid session")
}
// Create a user object from session data
user := &User{
ID: session.UserID,
Username: session.UserID, // In a real implementation, you'd fetch from database
}
// Update user permissions
user.Permissions = sm.roleManager.GetUserPermissions(user)
return user, nil
}
// CheckRateLimit checks if an action is rate limited
func (sm *SecurityManager) CheckRateLimit(key string) error {
if !sm.rateLimiter.IsAllowed(key) {
sm.auditLogger.LogEvent(AuditEvent{
EventType: "rate_limit",
Action: "exceeded",
Success: false,
Details: map[string]interface{}{
"key": key,
},
})
return fmt.Errorf("rate limit exceeded for key: %s", key)
}
return nil
}
// Encrypt encrypts data using the security manager's key
func (sm *SecurityManager) Encrypt(data []byte) ([]byte, error) {
// Simple encryption using SHA256 hash of key + data
// In production, use proper encryption like AES
hash := sha256.Sum256(append(sm.encryptionKey, data...))
return hash[:], nil
}
// Decrypt decrypts data (placeholder for proper decryption)
func (sm *SecurityManager) Decrypt(data []byte) ([]byte, error) {
// Placeholder - in production, implement proper decryption
return data, nil
}
// BasicAuthProvider implements basic username/password authentication
type BasicAuthProvider struct {
users map[string]*User
mu sync.RWMutex
}
func NewBasicAuthProvider() *BasicAuthProvider {
return &BasicAuthProvider{
users: make(map[string]*User),
}
}
func (bap *BasicAuthProvider) Name() string {
return "basic"
}
func (bap *BasicAuthProvider) Authenticate(ctx context.Context, credentials map[string]interface{}) (*User, error) {
username, ok := credentials["username"].(string)
if !ok {
return nil, fmt.Errorf("username required")
}
password, ok := credentials["password"].(string)
if !ok {
return nil, fmt.Errorf("password required")
}
bap.mu.RLock()
user, exists := bap.users[username]
bap.mu.RUnlock()
if !exists {
return nil, fmt.Errorf("user not found")
}
// In production, compare hashed passwords
if password != "password" { // Placeholder
return nil, fmt.Errorf("invalid password")
}
// Update last login
now := time.Now()
user.LastLoginAt = &now
return user, nil
}
func (bap *BasicAuthProvider) ValidateToken(token string) (*User, error) {
// Basic token validation - in production, use JWT or similar
parts := strings.Split(token, ":")
if len(parts) != 2 {
return nil, fmt.Errorf("invalid token format")
}
username := parts[0]
return bap.Authenticate(context.Background(), map[string]interface{}{
"username": username,
"password": "token", // Placeholder
})
}
func (bap *BasicAuthProvider) AddUser(user *User, password string) error {
bap.mu.Lock()
defer bap.mu.Unlock()
// In production, hash the password
user.CreatedAt = time.Now()
bap.users[user.Username] = user
return nil
}
// generateID generates a random ID
func generateID() string {
bytes := make([]byte, 16)
rand.Read(bytes)
return hex.EncodeToString(bytes)
}
// SecurityMiddleware provides security middleware for HTTP handlers
type SecurityMiddleware struct {
securityManager *SecurityManager
}
// NewSecurityMiddleware creates a new security middleware
func NewSecurityMiddleware(sm *SecurityManager) *SecurityMiddleware {
return &SecurityMiddleware{
securityManager: sm,
}
}
// AuthenticateRequest authenticates a request with credentials
func (sm *SecurityMiddleware) AuthenticateRequest(credentials map[string]interface{}, ipAddress string) (*User, error) {
user, err := sm.securityManager.Authenticate(context.Background(), credentials)
if err != nil {
// Log failed authentication attempt
sm.securityManager.auditLogger.LogEvent(AuditEvent{
EventType: "authentication",
Action: "login",
Success: false,
Details: map[string]interface{}{
"ip_address": ipAddress,
"error": err.Error(),
},
})
return nil, err
}
return user, nil
}
// AuthorizeRequest authorizes a request
func (sm *SecurityMiddleware) AuthorizeRequest(user *User, resource, action string) error {
return sm.securityManager.Authorize(user, resource, action)
}
// GetClientIP gets the real client IP from a network connection
func GetClientIP(conn net.Conn) string {
if conn == nil {
return ""
}
host, _, err := net.SplitHostPort(conn.RemoteAddr().String())
if err != nil {
return conn.RemoteAddr().String()
}
return host
}

View File

@@ -244,7 +244,7 @@ func (s *Socket) Close() error {
return nil
}
defer slog.Debug(s.ID(), "disconnected")
defer slog.Debug("Socket disconnected", "socket_id", s.ID())
err := s.ws.Close()
if err != nil {