Files
mq/config_manager.go
Oarkflow 271beed429 update
2025-08-02 16:17:20 +05:45

984 lines
32 KiB
Go

package mq
import (
"context"
"encoding/json"
"fmt"
"os"
"sync"
"time"
"github.com/oarkflow/mq/logger"
)
// ConfigManager handles dynamic configuration management
type ConfigManager struct {
config *ProductionConfig
watchers []ConfigWatcher
mu sync.RWMutex
logger logger.Logger
configFile string
}
// ProductionConfig contains all production configuration
type ProductionConfig struct {
Broker BrokerConfig `json:"broker"`
Consumer ConsumerConfig `json:"consumer"`
Publisher PublisherConfig `json:"publisher"`
Pool PoolConfig `json:"pool"`
Security SecurityConfig `json:"security"`
Monitoring MonitoringConfig `json:"monitoring"`
Persistence PersistenceConfig `json:"persistence"`
Clustering ClusteringConfig `json:"clustering"`
RateLimit RateLimitConfig `json:"rate_limit"`
LastUpdated time.Time `json:"last_updated"`
}
// BrokerConfig contains broker-specific configuration
type BrokerConfig struct {
Address string `json:"address"`
Port int `json:"port"`
MaxConnections int `json:"max_connections"`
ConnectionTimeout time.Duration `json:"connection_timeout"`
ReadTimeout time.Duration `json:"read_timeout"`
WriteTimeout time.Duration `json:"write_timeout"`
IdleTimeout time.Duration `json:"idle_timeout"`
KeepAlive bool `json:"keep_alive"`
KeepAlivePeriod time.Duration `json:"keep_alive_period"`
MaxQueueDepth int `json:"max_queue_depth"`
EnableDeadLetter bool `json:"enable_dead_letter"`
DeadLetterMaxRetries int `json:"dead_letter_max_retries"`
EnableMetrics bool `json:"enable_metrics"`
MetricsInterval time.Duration `json:"metrics_interval"`
GracefulShutdown time.Duration `json:"graceful_shutdown"`
MessageTTL time.Duration `json:"message_ttl"`
Headers map[string]string `json:"headers"`
}
// ConsumerConfig contains consumer-specific configuration
type ConsumerConfig struct {
MaxRetries int `json:"max_retries"`
InitialDelay time.Duration `json:"initial_delay"`
MaxBackoff time.Duration `json:"max_backoff"`
JitterPercent float64 `json:"jitter_percent"`
EnableReconnect bool `json:"enable_reconnect"`
ReconnectInterval time.Duration `json:"reconnect_interval"`
HealthCheckInterval time.Duration `json:"health_check_interval"`
MaxConcurrentTasks int `json:"max_concurrent_tasks"`
TaskTimeout time.Duration `json:"task_timeout"`
EnableDeduplication bool `json:"enable_deduplication"`
DeduplicationWindow time.Duration `json:"deduplication_window"`
EnablePriorityQueue bool `json:"enable_priority_queue"`
EnableHTTPAPI bool `json:"enable_http_api"`
HTTPAPIPort int `json:"http_api_port"`
EnableCircuitBreaker bool `json:"enable_circuit_breaker"`
CircuitBreakerThreshold int `json:"circuit_breaker_threshold"`
CircuitBreakerTimeout time.Duration `json:"circuit_breaker_timeout"`
}
// PublisherConfig contains publisher-specific configuration
type PublisherConfig struct {
MaxRetries int `json:"max_retries"`
InitialDelay time.Duration `json:"initial_delay"`
MaxBackoff time.Duration `json:"max_backoff"`
JitterPercent float64 `json:"jitter_percent"`
ConnectionPoolSize int `json:"connection_pool_size"`
PublishTimeout time.Duration `json:"publish_timeout"`
EnableBatching bool `json:"enable_batching"`
BatchSize int `json:"batch_size"`
BatchTimeout time.Duration `json:"batch_timeout"`
EnableCompression bool `json:"enable_compression"`
CompressionLevel int `json:"compression_level"`
EnableAsync bool `json:"enable_async"`
AsyncBufferSize int `json:"async_buffer_size"`
EnableOrderedDelivery bool `json:"enable_ordered_delivery"`
}
// PoolConfig contains worker pool configuration
type PoolConfig struct {
MinWorkers int `json:"min_workers"`
MaxWorkers int `json:"max_workers"`
QueueSize int `json:"queue_size"`
MaxMemoryLoad int64 `json:"max_memory_load"`
TaskTimeout time.Duration `json:"task_timeout"`
IdleWorkerTimeout time.Duration `json:"idle_worker_timeout"`
EnableDynamicScaling bool `json:"enable_dynamic_scaling"`
ScalingFactor float64 `json:"scaling_factor"`
ScalingInterval time.Duration `json:"scaling_interval"`
MaxQueueWaitTime time.Duration `json:"max_queue_wait_time"`
EnableWorkStealing bool `json:"enable_work_stealing"`
EnablePriorityScheduling bool `json:"enable_priority_scheduling"`
GracefulShutdownTimeout time.Duration `json:"graceful_shutdown_timeout"`
}
// SecurityConfig contains security-related configuration
type SecurityConfig struct {
EnableTLS bool `json:"enable_tls"`
TLSCertPath string `json:"tls_cert_path"`
TLSKeyPath string `json:"tls_key_path"`
TLSCAPath string `json:"tls_ca_path"`
TLSInsecureSkipVerify bool `json:"tls_insecure_skip_verify"`
EnableAuthentication bool `json:"enable_authentication"`
AuthenticationMethod string `json:"authentication_method"` // "basic", "jwt", "oauth"
EnableAuthorization bool `json:"enable_authorization"`
EnableEncryption bool `json:"enable_encryption"`
EncryptionKey string `json:"encryption_key"`
EnableAuditLog bool `json:"enable_audit_log"`
AuditLogPath string `json:"audit_log_path"`
SessionTimeout time.Duration `json:"session_timeout"`
MaxLoginAttempts int `json:"max_login_attempts"`
LockoutDuration time.Duration `json:"lockout_duration"`
}
// MonitoringConfig contains monitoring and observability configuration
type MonitoringConfig struct {
EnableMetrics bool `json:"enable_metrics"`
MetricsPort int `json:"metrics_port"`
MetricsPath string `json:"metrics_path"`
EnableHealthCheck bool `json:"enable_health_check"`
HealthCheckPort int `json:"health_check_port"`
HealthCheckPath string `json:"health_check_path"`
HealthCheckInterval time.Duration `json:"health_check_interval"`
EnableTracing bool `json:"enable_tracing"`
TracingEndpoint string `json:"tracing_endpoint"`
TracingSampleRate float64 `json:"tracing_sample_rate"`
EnableLogging bool `json:"enable_logging"`
LogLevel string `json:"log_level"`
LogFormat string `json:"log_format"` // "json", "text"
LogOutput string `json:"log_output"` // "stdout", "file", "syslog"
LogFilePath string `json:"log_file_path"`
LogMaxSize int `json:"log_max_size"` // MB
LogMaxBackups int `json:"log_max_backups"`
LogMaxAge int `json:"log_max_age"` // days
EnableProfiling bool `json:"enable_profiling"`
ProfilingPort int `json:"profiling_port"`
}
// PersistenceConfig contains data persistence configuration
type PersistenceConfig struct {
EnablePersistence bool `json:"enable_persistence"`
StorageType string `json:"storage_type"` // "memory", "file", "redis", "postgres", "mysql"
ConnectionString string `json:"connection_string"`
MaxConnections int `json:"max_connections"`
ConnectionTimeout time.Duration `json:"connection_timeout"`
RetentionPeriod time.Duration `json:"retention_period"`
CleanupInterval time.Duration `json:"cleanup_interval"`
BackupEnabled bool `json:"backup_enabled"`
BackupInterval time.Duration `json:"backup_interval"`
BackupPath string `json:"backup_path"`
CompressionEnabled bool `json:"compression_enabled"`
EncryptionEnabled bool `json:"encryption_enabled"`
ReplicationEnabled bool `json:"replication_enabled"`
ReplicationNodes []string `json:"replication_nodes"`
}
// ClusteringConfig contains clustering configuration
type ClusteringConfig struct {
EnableClustering bool `json:"enable_clustering"`
NodeID string `json:"node_id"`
ClusterNodes []string `json:"cluster_nodes"`
DiscoveryMethod string `json:"discovery_method"` // "static", "consul", "etcd", "k8s"
DiscoveryEndpoint string `json:"discovery_endpoint"`
HeartbeatInterval time.Duration `json:"heartbeat_interval"`
ElectionTimeout time.Duration `json:"election_timeout"`
EnableLoadBalancing bool `json:"enable_load_balancing"`
LoadBalancingStrategy string `json:"load_balancing_strategy"` // "round_robin", "least_connections", "hash"
EnableFailover bool `json:"enable_failover"`
FailoverTimeout time.Duration `json:"failover_timeout"`
EnableReplication bool `json:"enable_replication"`
ReplicationFactor int `json:"replication_factor"`
ConsistencyLevel string `json:"consistency_level"` // "weak", "strong", "eventual"
}
// RateLimitConfig contains rate limiting configuration
type RateLimitConfig struct {
EnableBrokerRateLimit bool `json:"enable_broker_rate_limit"`
BrokerRate int `json:"broker_rate"` // requests per second
BrokerBurst int `json:"broker_burst"`
EnableConsumerRateLimit bool `json:"enable_consumer_rate_limit"`
ConsumerRate int `json:"consumer_rate"`
ConsumerBurst int `json:"consumer_burst"`
EnablePublisherRateLimit bool `json:"enable_publisher_rate_limit"`
PublisherRate int `json:"publisher_rate"`
PublisherBurst int `json:"publisher_burst"`
EnablePerQueueRateLimit bool `json:"enable_per_queue_rate_limit"`
PerQueueRate int `json:"per_queue_rate"`
PerQueueBurst int `json:"per_queue_burst"`
}
// Custom unmarshaling to handle duration strings
func (c *ProductionConfig) UnmarshalJSON(data []byte) error {
type Alias ProductionConfig
aux := &struct {
*Alias
LastUpdated string `json:"last_updated"`
}{
Alias: (*Alias)(c),
}
if err := json.Unmarshal(data, &aux); err != nil {
return err
}
if aux.LastUpdated != "" {
if t, err := time.Parse(time.RFC3339, aux.LastUpdated); err == nil {
c.LastUpdated = t
}
}
return nil
}
func (b *BrokerConfig) UnmarshalJSON(data []byte) error {
type Alias BrokerConfig
aux := &struct {
*Alias
ConnectionTimeout string `json:"connection_timeout"`
ReadTimeout string `json:"read_timeout"`
WriteTimeout string `json:"write_timeout"`
IdleTimeout string `json:"idle_timeout"`
KeepAlivePeriod string `json:"keep_alive_period"`
MetricsInterval string `json:"metrics_interval"`
GracefulShutdown string `json:"graceful_shutdown"`
MessageTTL string `json:"message_ttl"`
}{
Alias: (*Alias)(b),
}
if err := json.Unmarshal(data, &aux); err != nil {
return err
}
var err error
if aux.ConnectionTimeout != "" {
if b.ConnectionTimeout, err = time.ParseDuration(aux.ConnectionTimeout); err != nil {
return fmt.Errorf("invalid connection_timeout: %w", err)
}
}
if aux.ReadTimeout != "" {
if b.ReadTimeout, err = time.ParseDuration(aux.ReadTimeout); err != nil {
return fmt.Errorf("invalid read_timeout: %w", err)
}
}
if aux.WriteTimeout != "" {
if b.WriteTimeout, err = time.ParseDuration(aux.WriteTimeout); err != nil {
return fmt.Errorf("invalid write_timeout: %w", err)
}
}
if aux.IdleTimeout != "" {
if b.IdleTimeout, err = time.ParseDuration(aux.IdleTimeout); err != nil {
return fmt.Errorf("invalid idle_timeout: %w", err)
}
}
if aux.KeepAlivePeriod != "" {
if b.KeepAlivePeriod, err = time.ParseDuration(aux.KeepAlivePeriod); err != nil {
return fmt.Errorf("invalid keep_alive_period: %w", err)
}
}
if aux.MetricsInterval != "" {
if b.MetricsInterval, err = time.ParseDuration(aux.MetricsInterval); err != nil {
return fmt.Errorf("invalid metrics_interval: %w", err)
}
}
if aux.GracefulShutdown != "" {
if b.GracefulShutdown, err = time.ParseDuration(aux.GracefulShutdown); err != nil {
return fmt.Errorf("invalid graceful_shutdown: %w", err)
}
}
if aux.MessageTTL != "" {
if b.MessageTTL, err = time.ParseDuration(aux.MessageTTL); err != nil {
return fmt.Errorf("invalid message_ttl: %w", err)
}
}
return nil
}
func (c *ConsumerConfig) UnmarshalJSON(data []byte) error {
type Alias ConsumerConfig
aux := &struct {
*Alias
InitialDelay string `json:"initial_delay"`
MaxBackoff string `json:"max_backoff"`
ReconnectInterval string `json:"reconnect_interval"`
HealthCheckInterval string `json:"health_check_interval"`
TaskTimeout string `json:"task_timeout"`
DeduplicationWindow string `json:"deduplication_window"`
CircuitBreakerTimeout string `json:"circuit_breaker_timeout"`
}{
Alias: (*Alias)(c),
}
if err := json.Unmarshal(data, &aux); err != nil {
return err
}
var err error
if aux.InitialDelay != "" {
if c.InitialDelay, err = time.ParseDuration(aux.InitialDelay); err != nil {
return fmt.Errorf("invalid initial_delay: %w", err)
}
}
if aux.MaxBackoff != "" {
if c.MaxBackoff, err = time.ParseDuration(aux.MaxBackoff); err != nil {
return fmt.Errorf("invalid max_backoff: %w", err)
}
}
if aux.ReconnectInterval != "" {
if c.ReconnectInterval, err = time.ParseDuration(aux.ReconnectInterval); err != nil {
return fmt.Errorf("invalid reconnect_interval: %w", err)
}
}
if aux.HealthCheckInterval != "" {
if c.HealthCheckInterval, err = time.ParseDuration(aux.HealthCheckInterval); err != nil {
return fmt.Errorf("invalid health_check_interval: %w", err)
}
}
if aux.TaskTimeout != "" {
if c.TaskTimeout, err = time.ParseDuration(aux.TaskTimeout); err != nil {
return fmt.Errorf("invalid task_timeout: %w", err)
}
}
if aux.DeduplicationWindow != "" {
if c.DeduplicationWindow, err = time.ParseDuration(aux.DeduplicationWindow); err != nil {
return fmt.Errorf("invalid deduplication_window: %w", err)
}
}
if aux.CircuitBreakerTimeout != "" {
if c.CircuitBreakerTimeout, err = time.ParseDuration(aux.CircuitBreakerTimeout); err != nil {
return fmt.Errorf("invalid circuit_breaker_timeout: %w", err)
}
}
return nil
}
func (p *PublisherConfig) UnmarshalJSON(data []byte) error {
type Alias PublisherConfig
aux := &struct {
*Alias
InitialDelay string `json:"initial_delay"`
MaxBackoff string `json:"max_backoff"`
PublishTimeout string `json:"publish_timeout"`
BatchTimeout string `json:"batch_timeout"`
}{
Alias: (*Alias)(p),
}
if err := json.Unmarshal(data, &aux); err != nil {
return err
}
var err error
if aux.InitialDelay != "" {
if p.InitialDelay, err = time.ParseDuration(aux.InitialDelay); err != nil {
return fmt.Errorf("invalid initial_delay: %w", err)
}
}
if aux.MaxBackoff != "" {
if p.MaxBackoff, err = time.ParseDuration(aux.MaxBackoff); err != nil {
return fmt.Errorf("invalid max_backoff: %w", err)
}
}
if aux.PublishTimeout != "" {
if p.PublishTimeout, err = time.ParseDuration(aux.PublishTimeout); err != nil {
return fmt.Errorf("invalid publish_timeout: %w", err)
}
}
if aux.BatchTimeout != "" {
if p.BatchTimeout, err = time.ParseDuration(aux.BatchTimeout); err != nil {
return fmt.Errorf("invalid batch_timeout: %w", err)
}
}
return nil
}
func (p *PoolConfig) UnmarshalJSON(data []byte) error {
type Alias PoolConfig
aux := &struct {
*Alias
TaskTimeout string `json:"task_timeout"`
IdleTimeout string `json:"idle_timeout"`
ScalingInterval string `json:"scaling_interval"`
MaxQueueWaitTime string `json:"max_queue_wait_time"`
GracefulShutdownTimeout string `json:"graceful_shutdown_timeout"`
}{
Alias: (*Alias)(p),
}
if err := json.Unmarshal(data, &aux); err != nil {
return err
}
var err error
if aux.TaskTimeout != "" {
if p.TaskTimeout, err = time.ParseDuration(aux.TaskTimeout); err != nil {
return fmt.Errorf("invalid task_timeout: %w", err)
}
}
if aux.IdleTimeout != "" {
if p.IdleWorkerTimeout, err = time.ParseDuration(aux.IdleTimeout); err != nil {
return fmt.Errorf("invalid idle_timeout: %w", err)
}
}
if aux.ScalingInterval != "" {
if p.ScalingInterval, err = time.ParseDuration(aux.ScalingInterval); err != nil {
return fmt.Errorf("invalid scaling_interval: %w", err)
}
}
if aux.MaxQueueWaitTime != "" {
if p.MaxQueueWaitTime, err = time.ParseDuration(aux.MaxQueueWaitTime); err != nil {
return fmt.Errorf("invalid max_queue_wait_time: %w", err)
}
}
if aux.GracefulShutdownTimeout != "" {
if p.GracefulShutdownTimeout, err = time.ParseDuration(aux.GracefulShutdownTimeout); err != nil {
return fmt.Errorf("invalid graceful_shutdown_timeout: %w", err)
}
}
return nil
}
func (m *MonitoringConfig) UnmarshalJSON(data []byte) error {
type Alias MonitoringConfig
aux := &struct {
*Alias
HealthCheckInterval string `json:"health_check_interval"`
MetricsInterval string `json:"metrics_interval"`
RetentionPeriod string `json:"retention_period"`
}{
Alias: (*Alias)(m),
}
if err := json.Unmarshal(data, &aux); err != nil {
return err
}
var err error
if aux.HealthCheckInterval != "" {
if m.HealthCheckInterval, err = time.ParseDuration(aux.HealthCheckInterval); err != nil {
return fmt.Errorf("invalid health_check_interval: %w", err)
}
}
return nil
}
func (p *PersistenceConfig) UnmarshalJSON(data []byte) error {
type Alias PersistenceConfig
aux := &struct {
*Alias
ConnectionTimeout string `json:"connection_timeout"`
RetentionPeriod string `json:"retention_period"`
CleanupInterval string `json:"cleanup_interval"`
BackupInterval string `json:"backup_interval"`
}{
Alias: (*Alias)(p),
}
if err := json.Unmarshal(data, &aux); err != nil {
return err
}
var err error
if aux.ConnectionTimeout != "" {
if p.ConnectionTimeout, err = time.ParseDuration(aux.ConnectionTimeout); err != nil {
return fmt.Errorf("invalid connection_timeout: %w", err)
}
}
if aux.RetentionPeriod != "" {
if p.RetentionPeriod, err = time.ParseDuration(aux.RetentionPeriod); err != nil {
return fmt.Errorf("invalid retention_period: %w", err)
}
}
if aux.CleanupInterval != "" {
if p.CleanupInterval, err = time.ParseDuration(aux.CleanupInterval); err != nil {
return fmt.Errorf("invalid cleanup_interval: %w", err)
}
}
if aux.BackupInterval != "" {
if p.BackupInterval, err = time.ParseDuration(aux.BackupInterval); err != nil {
return fmt.Errorf("invalid backup_interval: %w", err)
}
}
return nil
}
func (c *ClusteringConfig) UnmarshalJSON(data []byte) error {
type Alias ClusteringConfig
aux := &struct {
*Alias
HeartbeatInterval string `json:"heartbeat_interval"`
ElectionTimeout string `json:"election_timeout"`
FailoverTimeout string `json:"failover_timeout"`
}{
Alias: (*Alias)(c),
}
if err := json.Unmarshal(data, &aux); err != nil {
return err
}
var err error
if aux.HeartbeatInterval != "" {
if c.HeartbeatInterval, err = time.ParseDuration(aux.HeartbeatInterval); err != nil {
return fmt.Errorf("invalid heartbeat_interval: %w", err)
}
}
if aux.ElectionTimeout != "" {
if c.ElectionTimeout, err = time.ParseDuration(aux.ElectionTimeout); err != nil {
return fmt.Errorf("invalid election_timeout: %w", err)
}
}
if aux.FailoverTimeout != "" {
if c.FailoverTimeout, err = time.ParseDuration(aux.FailoverTimeout); err != nil {
return fmt.Errorf("invalid failover_timeout: %w", err)
}
}
return nil
}
func (s *SecurityConfig) UnmarshalJSON(data []byte) error {
type Alias SecurityConfig
aux := &struct {
*Alias
SessionTimeout string `json:"session_timeout"`
LockoutDuration string `json:"lockout_duration"`
}{
Alias: (*Alias)(s),
}
if err := json.Unmarshal(data, &aux); err != nil {
return err
}
var err error
if aux.SessionTimeout != "" {
if s.SessionTimeout, err = time.ParseDuration(aux.SessionTimeout); err != nil {
return fmt.Errorf("invalid session_timeout: %w", err)
}
}
if aux.LockoutDuration != "" {
if s.LockoutDuration, err = time.ParseDuration(aux.LockoutDuration); err != nil {
return fmt.Errorf("invalid lockout_duration: %w", err)
}
}
return nil
}
// ConfigWatcher interface for configuration change notifications
type ConfigWatcher interface {
OnConfigChange(oldConfig, newConfig *ProductionConfig) error
}
// NewConfigManager creates a new configuration manager
func NewConfigManager(configFile string, logger logger.Logger) *ConfigManager {
return &ConfigManager{
config: DefaultProductionConfig(),
watchers: make([]ConfigWatcher, 0),
logger: logger,
configFile: configFile,
}
}
// DefaultProductionConfig returns default production configuration
func DefaultProductionConfig() *ProductionConfig {
return &ProductionConfig{
Broker: BrokerConfig{
Address: "localhost",
Port: 8080,
MaxConnections: 1000,
ConnectionTimeout: 0, // NO timeout for broker-consumer connections
ReadTimeout: 0, // NO read timeout - consumers need persistent connections
WriteTimeout: 0, // NO write timeout - allow unlimited time for large messages
IdleTimeout: 0, // NO idle timeout - keep connections alive indefinitely
KeepAlive: true,
KeepAlivePeriod: 30 * time.Second,
MaxQueueDepth: 10000,
EnableDeadLetter: true,
DeadLetterMaxRetries: 3,
EnableMetrics: true,
MetricsInterval: 1 * time.Minute,
GracefulShutdown: 30 * time.Second,
MessageTTL: 24 * time.Hour,
Headers: make(map[string]string),
},
Consumer: ConsumerConfig{
MaxRetries: 5,
InitialDelay: 2 * time.Second,
MaxBackoff: 20 * time.Second,
JitterPercent: 0.5,
EnableReconnect: true,
ReconnectInterval: 5 * time.Second,
HealthCheckInterval: 30 * time.Second,
MaxConcurrentTasks: 100,
TaskTimeout: 30 * time.Second,
EnableDeduplication: true,
DeduplicationWindow: 5 * time.Minute,
EnablePriorityQueue: true,
EnableHTTPAPI: true,
HTTPAPIPort: 0, // Random port
EnableCircuitBreaker: true,
CircuitBreakerThreshold: 10,
CircuitBreakerTimeout: 30 * time.Second,
},
Publisher: PublisherConfig{
MaxRetries: 5,
InitialDelay: 2 * time.Second,
MaxBackoff: 20 * time.Second,
JitterPercent: 0.5,
ConnectionPoolSize: 10,
PublishTimeout: 10 * time.Second,
EnableBatching: false,
BatchSize: 100,
BatchTimeout: 1 * time.Second,
EnableCompression: false,
CompressionLevel: 6,
EnableAsync: false,
AsyncBufferSize: 1000,
EnableOrderedDelivery: false,
},
Pool: PoolConfig{
MinWorkers: 1,
MaxWorkers: 100,
QueueSize: 1000,
MaxMemoryLoad: 1024 * 1024 * 1024, // 1GB
TaskTimeout: 30 * time.Second,
IdleWorkerTimeout: 5 * time.Minute,
EnableDynamicScaling: true,
ScalingFactor: 1.5,
ScalingInterval: 1 * time.Minute,
MaxQueueWaitTime: 10 * time.Second,
EnableWorkStealing: false,
EnablePriorityScheduling: true,
GracefulShutdownTimeout: 30 * time.Second,
},
Security: SecurityConfig{
EnableTLS: false,
TLSCertPath: "",
TLSKeyPath: "",
TLSCAPath: "",
TLSInsecureSkipVerify: false,
EnableAuthentication: false,
AuthenticationMethod: "basic",
EnableAuthorization: false,
EnableEncryption: false,
EncryptionKey: "",
EnableAuditLog: false,
AuditLogPath: "/var/log/mq/audit.log",
SessionTimeout: 30 * time.Minute,
MaxLoginAttempts: 3,
LockoutDuration: 15 * time.Minute,
},
Monitoring: MonitoringConfig{
EnableMetrics: true,
MetricsPort: 9090,
MetricsPath: "/metrics",
EnableHealthCheck: true,
HealthCheckPort: 8081,
HealthCheckPath: "/health",
HealthCheckInterval: 30 * time.Second,
EnableTracing: false,
TracingEndpoint: "",
TracingSampleRate: 0.1,
EnableLogging: true,
LogLevel: "info",
LogFormat: "json",
LogOutput: "stdout",
LogFilePath: "/var/log/mq/app.log",
LogMaxSize: 100, // MB
LogMaxBackups: 10,
LogMaxAge: 30, // days
EnableProfiling: false,
ProfilingPort: 6060,
},
Persistence: PersistenceConfig{
EnablePersistence: false,
StorageType: "memory",
ConnectionString: "",
MaxConnections: 10,
ConnectionTimeout: 10 * time.Second,
RetentionPeriod: 7 * 24 * time.Hour, // 7 days
CleanupInterval: 1 * time.Hour,
BackupEnabled: false,
BackupInterval: 6 * time.Hour,
BackupPath: "/var/backup/mq",
CompressionEnabled: true,
EncryptionEnabled: false,
ReplicationEnabled: false,
ReplicationNodes: []string{},
},
Clustering: ClusteringConfig{
EnableClustering: false,
NodeID: "",
ClusterNodes: []string{},
DiscoveryMethod: "static",
DiscoveryEndpoint: "",
HeartbeatInterval: 5 * time.Second,
ElectionTimeout: 15 * time.Second,
EnableLoadBalancing: false,
LoadBalancingStrategy: "round_robin",
EnableFailover: false,
FailoverTimeout: 30 * time.Second,
EnableReplication: false,
ReplicationFactor: 3,
ConsistencyLevel: "strong",
},
RateLimit: RateLimitConfig{
EnableBrokerRateLimit: false,
BrokerRate: 1000,
BrokerBurst: 100,
EnableConsumerRateLimit: false,
ConsumerRate: 100,
ConsumerBurst: 10,
EnablePublisherRateLimit: false,
PublisherRate: 100,
PublisherBurst: 10,
EnablePerQueueRateLimit: false,
PerQueueRate: 50,
PerQueueBurst: 5,
},
LastUpdated: time.Now(),
}
}
// LoadConfig loads configuration from file
func (cm *ConfigManager) LoadConfig() error {
cm.mu.Lock()
defer cm.mu.Unlock()
if cm.configFile == "" {
cm.logger.Info("No config file specified, using defaults")
return nil
}
data, err := os.ReadFile(cm.configFile)
if err != nil {
if os.IsNotExist(err) {
cm.logger.Info("Config file not found, creating with defaults",
logger.Field{Key: "file", Value: cm.configFile})
return cm.saveConfigLocked()
}
return fmt.Errorf("failed to read config file: %w", err)
}
oldConfig := *cm.config
if err := json.Unmarshal(data, cm.config); err != nil {
return fmt.Errorf("failed to parse config file: %w", err)
}
cm.config.LastUpdated = time.Now()
// Notify watchers
for _, watcher := range cm.watchers {
if err := watcher.OnConfigChange(&oldConfig, cm.config); err != nil {
cm.logger.Error("Config watcher error",
logger.Field{Key: "error", Value: err.Error()})
}
}
cm.logger.Info("Configuration loaded successfully",
logger.Field{Key: "file", Value: cm.configFile})
return nil
}
// SaveConfig saves current configuration to file
func (cm *ConfigManager) SaveConfig() error {
cm.mu.Lock()
defer cm.mu.Unlock()
return cm.saveConfigLocked()
}
func (cm *ConfigManager) saveConfigLocked() error {
if cm.configFile == "" {
return fmt.Errorf("no config file specified")
}
cm.config.LastUpdated = time.Now()
data, err := json.MarshalIndent(cm.config, "", " ")
if err != nil {
return fmt.Errorf("failed to marshal config: %w", err)
}
if err := os.WriteFile(cm.configFile, data, 0644); err != nil {
return fmt.Errorf("failed to write config file: %w", err)
}
cm.logger.Info("Configuration saved successfully",
logger.Field{Key: "file", Value: cm.configFile})
return nil
}
// GetConfig returns a copy of the current configuration
func (cm *ConfigManager) GetConfig() *ProductionConfig {
cm.mu.RLock()
defer cm.mu.RUnlock()
// Return a copy to prevent external modification
configCopy := *cm.config
return &configCopy
}
// UpdateConfig updates the configuration
func (cm *ConfigManager) UpdateConfig(newConfig *ProductionConfig) error {
cm.mu.Lock()
defer cm.mu.Unlock()
oldConfig := *cm.config
// Validate configuration
if err := cm.validateConfig(newConfig); err != nil {
return fmt.Errorf("invalid configuration: %w", err)
}
cm.config = newConfig
cm.config.LastUpdated = time.Now()
// Notify watchers
for _, watcher := range cm.watchers {
if err := watcher.OnConfigChange(&oldConfig, cm.config); err != nil {
cm.logger.Error("Config watcher error",
logger.Field{Key: "error", Value: err.Error()})
}
}
// Auto-save if file is specified
if cm.configFile != "" {
if err := cm.saveConfigLocked(); err != nil {
cm.logger.Error("Failed to auto-save configuration",
logger.Field{Key: "error", Value: err.Error()})
}
}
cm.logger.Info("Configuration updated successfully")
return nil
}
// AddWatcher adds a configuration watcher
func (cm *ConfigManager) AddWatcher(watcher ConfigWatcher) {
cm.mu.Lock()
defer cm.mu.Unlock()
cm.watchers = append(cm.watchers, watcher)
}
// RemoveWatcher removes a configuration watcher
func (cm *ConfigManager) RemoveWatcher(watcher ConfigWatcher) {
cm.mu.Lock()
defer cm.mu.Unlock()
for i, w := range cm.watchers {
if w == watcher {
cm.watchers = append(cm.watchers[:i], cm.watchers[i+1:]...)
break
}
}
}
// validateConfig validates the configuration
func (cm *ConfigManager) validateConfig(config *ProductionConfig) error {
// Validate broker config
if config.Broker.Port <= 0 || config.Broker.Port > 65535 {
return fmt.Errorf("invalid broker port: %d", config.Broker.Port)
}
if config.Broker.MaxConnections <= 0 {
return fmt.Errorf("max connections must be positive")
}
// Validate consumer config
if config.Consumer.MaxRetries < 0 {
return fmt.Errorf("max retries cannot be negative")
}
if config.Consumer.JitterPercent < 0 || config.Consumer.JitterPercent > 1 {
return fmt.Errorf("jitter percent must be between 0 and 1")
}
// Validate publisher config
if config.Publisher.ConnectionPoolSize <= 0 {
return fmt.Errorf("connection pool size must be positive")
}
// Validate pool config
if config.Pool.MinWorkers <= 0 {
return fmt.Errorf("min workers must be positive")
}
if config.Pool.MaxWorkers < config.Pool.MinWorkers {
return fmt.Errorf("max workers must be >= min workers")
}
if config.Pool.QueueSize <= 0 {
return fmt.Errorf("queue size must be positive")
}
// Validate security config
if config.Security.EnableTLS {
if config.Security.TLSCertPath == "" || config.Security.TLSKeyPath == "" {
return fmt.Errorf("TLS cert and key paths required when TLS is enabled")
}
}
// Validate monitoring config
if config.Monitoring.EnableMetrics {
if config.Monitoring.MetricsPort <= 0 || config.Monitoring.MetricsPort > 65535 {
return fmt.Errorf("invalid metrics port: %d", config.Monitoring.MetricsPort)
}
}
// Validate clustering config
if config.Clustering.EnableClustering {
if config.Clustering.NodeID == "" {
return fmt.Errorf("node ID required when clustering is enabled")
}
}
return nil
}
// StartWatching starts watching for configuration changes
func (cm *ConfigManager) StartWatching(ctx context.Context, interval time.Duration) {
if cm.configFile == "" {
return
}
ticker := time.NewTicker(interval)
defer ticker.Stop()
var lastModTime time.Time
if stat, err := os.Stat(cm.configFile); err == nil {
lastModTime = stat.ModTime()
}
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
stat, err := os.Stat(cm.configFile)
if err != nil {
continue
}
if stat.ModTime().After(lastModTime) {
lastModTime = stat.ModTime()
if err := cm.LoadConfig(); err != nil {
cm.logger.Error("Failed to reload configuration",
logger.Field{Key: "error", Value: err.Error()})
} else {
cm.logger.Info("Configuration reloaded from file")
}
}
}
}
}