From 271beed42991b1af7b3245e8be0f715be761cbf6 Mon Sep 17 00:00:00 2001 From: Oarkflow Date: Sat, 2 Aug 2025 16:17:20 +0545 Subject: [PATCH] update --- codec/codec.go | 60 +++++++------- config_manager.go | 8 +- consumer.go | 7 +- examples/config/production.json | 140 ++++++++++++++++++++------------ mq.go | 110 ++++++++++++++++++------- 5 files changed, 212 insertions(+), 113 deletions(-) diff --git a/codec/codec.go b/codec/codec.go index fd9dd7e..5a70b61 100644 --- a/codec/codec.go +++ b/codec/codec.go @@ -52,14 +52,14 @@ type Config struct { BufferPoolSize int } -// DefaultConfig returns default configuration +// DefaultConfig returns default configuration with NO timeouts for persistent connections func DefaultConfig() *Config { return &Config{ MaxMessageSize: MaxMessageSize, MaxHeaderSize: MaxHeaderSize, MaxQueueLength: MaxQueueLength, - ReadTimeout: 30 * time.Second, - WriteTimeout: 30 * time.Second, + ReadTimeout: 0, // NO read timeout for persistent broker-consumer connections + WriteTimeout: 0, // NO write timeout for persistent broker-consumer connections EnableCompression: false, BufferPoolSize: 1000, } @@ -252,7 +252,7 @@ func (c *Codec) SendMessage(ctx context.Context, conn net.Conn, msg *Message) er return c.sendRawMessage(ctx, conn, msg) } -// sendRawMessage handles the actual sending of a message or fragment +// sendRawMessage handles the actual sending of a message or fragment WITHOUT timeouts func (c *Codec) sendRawMessage(ctx context.Context, conn net.Conn, msg *Message) error { // Serialize message data, err := msg.Serialize() @@ -283,17 +283,21 @@ func (c *Codec) sendRawMessage(ctx context.Context, conn net.Conn, msg *Message) binary.BigEndian.PutUint32(buffer.B[:4], uint32(len(data))) copy(buffer.B[4:], data) - // Set timeout - deadline := time.Now().Add(c.config.WriteTimeout) - if ctxDeadline, ok := ctx.Deadline(); ok && ctxDeadline.Before(deadline) { - deadline = ctxDeadline - } + // CRITICAL: DO NOT set any write deadlines for broker-consumer connections + // These connections must remain open indefinitely for persistent communication + // Only set timeout if explicitly configured AND not zero (for backward compatibility) + if c.config.WriteTimeout > 0 { + deadline := time.Now().Add(c.config.WriteTimeout) + if ctxDeadline, ok := ctx.Deadline(); ok && ctxDeadline.Before(deadline) { + deadline = ctxDeadline + } - if err := conn.SetWriteDeadline(deadline); err != nil { - c.incrementErrors() - return fmt.Errorf("failed to set write deadline: %w", err) + if err := conn.SetWriteDeadline(deadline); err != nil { + c.incrementErrors() + return fmt.Errorf("failed to set write deadline: %w", err) + } + defer conn.SetWriteDeadline(time.Time{}) } - defer conn.SetWriteDeadline(time.Time{}) // Write with buffering writer := bufio.NewWriter(conn) @@ -318,7 +322,7 @@ func (c *Codec) sendRawMessage(ctx context.Context, conn net.Conn, msg *Message) return nil } -// ReadMessage reads a message with proper error handling and timeouts +// ReadMessage reads a message WITHOUT timeouts for persistent broker-consumer connections func (c *Codec) ReadMessage(ctx context.Context, conn net.Conn) (*Message, error) { // Check context cancellation before proceeding if err := ctx.Err(); err != nil { @@ -326,24 +330,22 @@ func (c *Codec) ReadMessage(ctx context.Context, conn net.Conn) (*Message, error return nil, fmt.Errorf("context ended before read: %w", err) } - // Check context cancellation before proceeding - if err := ctx.Err(); err != nil { - c.incrementErrors() - return nil, fmt.Errorf("context ended before read: %w", err) - } + // CRITICAL: DO NOT set any read deadlines for broker-consumer connections + // These connections must remain open indefinitely for persistent communication + // Only set timeout if explicitly configured AND not zero (for backward compatibility) + if c.config.ReadTimeout > 0 { + deadline := time.Now().Add(c.config.ReadTimeout) + if ctxDeadline, ok := ctx.Deadline(); ok && ctxDeadline.Before(deadline) { + deadline = ctxDeadline + } - // Set timeout - deadline := time.Now().Add(c.config.ReadTimeout) - if ctxDeadline, ok := ctx.Deadline(); ok && ctxDeadline.Before(deadline) { - deadline = ctxDeadline + if err := conn.SetReadDeadline(deadline); err != nil { + c.incrementErrors() + return nil, fmt.Errorf("failed to set read deadline: %w", err) + } + defer conn.SetReadDeadline(time.Time{}) } - if err := conn.SetReadDeadline(deadline); err != nil { - c.incrementErrors() - return nil, fmt.Errorf("failed to set read deadline: %w", err) - } - defer conn.SetReadDeadline(time.Time{}) - // Read length prefix lengthBytes := make([]byte, 4) if _, err := io.ReadFull(conn, lengthBytes); err != nil { diff --git a/config_manager.go b/config_manager.go index a58daee..50d30cf 100644 --- a/config_manager.go +++ b/config_manager.go @@ -593,10 +593,10 @@ func DefaultProductionConfig() *ProductionConfig { Address: "localhost", Port: 8080, MaxConnections: 1000, - ConnectionTimeout: 30 * time.Second, - ReadTimeout: 30 * time.Second, - WriteTimeout: 30 * time.Second, - IdleTimeout: 5 * time.Minute, + ConnectionTimeout: 0, // NO timeout for broker-consumer connections + ReadTimeout: 0, // NO read timeout - consumers need persistent connections + WriteTimeout: 0, // NO write timeout - allow unlimited time for large messages + IdleTimeout: 0, // NO idle timeout - keep connections alive indefinitely KeepAlive: true, KeepAlivePeriod: 30 * time.Second, MaxQueueDepth: 10000, diff --git a/consumer.go b/consumer.go index 01f4788..bafa45c 100644 --- a/consumer.go +++ b/consumer.go @@ -652,8 +652,9 @@ func (c *Consumer) Consume(ctx context.Context) error { } } +// processWithTimeout processes messages WITHOUT I/O timeouts for persistent broker connections func (c *Consumer) processWithTimeout(ctx context.Context) error { - // Consumer should wait indefinitely for messages from broker - no I/O timeout + // Consumer should wait indefinitely for messages from broker - NO I/O timeout // Only individual task processing should have timeouts, not the consumer connection c.connMutex.RLock() conn := c.conn @@ -663,7 +664,9 @@ func (c *Consumer) processWithTimeout(ctx context.Context) error { return fmt.Errorf("no connection available") } - // Read message without timeout - consumer should be long-running background service + // CRITICAL: Never set any connection timeouts for broker-consumer communication + // The consumer must maintain a persistent connection to the broker indefinitely + // Read message without ANY timeout - consumer should be long-running background service err := c.readMessage(ctx, conn) // If message was processed successfully, reset reconnection attempts diff --git a/examples/config/production.json b/examples/config/production.json index c58d1a3..72a41bc 100644 --- a/examples/config/production.json +++ b/examples/config/production.json @@ -3,10 +3,10 @@ "address": "localhost", "port": 8080, "max_connections": 1000, - "connection_timeout": "5s", - "read_timeout": "300s", - "write_timeout": "30s", - "idle_timeout": "600s", + "connection_timeout": "0s", + "read_timeout": "0s", + "write_timeout": "0s", + "idle_timeout": "0s", "keep_alive": true, "keep_alive_period": "60s", "max_queue_depth": 10000, @@ -29,71 +29,111 @@ "max_retries": 3, "initial_delay": "1s", "max_backoff": "10s", - "confirm_delivery": true, + "jitter_percent": 0.5, + "connection_pool_size": 10, "publish_timeout": "5s", - "connection_pool_size": 10 + "enable_batching": false, + "batch_size": 100, + "batch_timeout": "1s" }, "pool": { + "min_workers": 1, + "max_workers": 100, "queue_size": 1000, - "max_workers": 20, "max_memory_load": 1073741824, - "idle_timeout": "300s", - "graceful_shutdown_timeout": "30s", - "task_timeout": "60s", - "enable_metrics": true, - "enable_diagnostics": true + "task_timeout": "30s", + "idle_worker_timeout": "5m", + "enable_dynamic_scaling": true, + "scaling_factor": 1.5, + "scaling_interval": "1m", + "max_queue_wait_time": "10s", + "enable_work_stealing": false, + "enable_priority_scheduling": true, + "graceful_shutdown_timeout": "30s" }, "security": { "enable_tls": false, - "tls_cert_path": "./certs/server.crt", - "tls_key_path": "./certs/server.key", - "tls_ca_path": "./certs/ca.crt", - "enable_auth": false, - "auth_provider": "jwt", - "jwt_secret": "your-secret-key", + "tls_cert_path": "", + "tls_key_path": "", + "tls_ca_path": "", + "tls_insecure_skip_verify": false, + "enable_authentication": false, + "authentication_method": "basic", + "enable_authorization": false, "enable_encryption": false, - "encryption_key": "32-byte-encryption-key-here!!" + "encryption_key": "", + "enable_audit_log": false, + "audit_log_path": "/var/log/mq/audit.log", + "session_timeout": "30m", + "max_login_attempts": 3, + "lockout_duration": "15m" }, "monitoring": { - "metrics_port": 9090, - "health_check_port": 9091, "enable_metrics": true, - "enable_health_checks": true, - "metrics_interval": "10s", + "metrics_port": 9090, + "metrics_path": "/metrics", + "enable_health_check": true, + "health_check_port": 8081, + "health_check_path": "/health", "health_check_interval": "30s", - "retention_period": "24h", - "enable_tracing": true, - "jaeger_endpoint": "http://localhost:14268/api/traces" + "enable_tracing": false, + "tracing_endpoint": "", + "tracing_sample_rate": 0.1, + "enable_logging": true, + "log_level": "info", + "log_format": "json", + "log_output": "stdout", + "log_file_path": "/var/log/mq/app.log", + "log_max_size": 100, + "log_max_backups": 10, + "log_max_age": 30, + "enable_profiling": false, + "profiling_port": 6060 }, "persistence": { - "enable": true, - "provider": "postgres", - "connection_string": "postgres://user:password@localhost:5432/mq_db?sslmode=disable", - "max_connections": 50, - "connection_timeout": "30s", - "enable_migrations": true, - "backup_enabled": true, - "backup_interval": "6h" + "enable_persistence": false, + "storage_type": "memory", + "connection_string": "", + "max_connections": 10, + "connection_timeout": "10s", + "retention_period": "168h", + "cleanup_interval": "1h", + "backup_enabled": false, + "backup_interval": "6h", + "backup_path": "/var/backup/mq", + "compression_enabled": true, + "encryption_enabled": false, + "replication_enabled": false, + "replication_nodes": [ ] }, "clustering": { - "enable": false, - "node_id": "node-1", - "cluster_name": "mq-cluster", - "peers": [ ], - "election_timeout": "5s", - "heartbeat_interval": "1s", - "enable_auto_discovery": false, - "discovery_port": 7946 + "enable_clustering": false, + "node_id": "", + "cluster_nodes": [ ], + "discovery_method": "static", + "discovery_endpoint": "", + "heartbeat_interval": "5s", + "election_timeout": "15s", + "enable_load_balancing": false, + "load_balancing_strategy": "round_robin", + "enable_failover": false, + "failover_timeout": "30s", + "enable_replication": false, + "replication_factor": 3, + "consistency_level": "strong" }, "rate_limit": { + "enable_broker_rate_limit": false, "broker_rate": 1000, "broker_burst": 100, - "consumer_rate": 500, - "consumer_burst": 50, - "publisher_rate": 200, - "publisher_burst": 20, - "global_rate": 2000, - "global_burst": 200 - }, - "last_updated": "2025-07-29T00:00:00Z" + "enable_consumer_rate_limit": false, + "consumer_rate": 100, + "consumer_burst": 10, + "enable_publisher_rate_limit": false, + "publisher_rate": 100, + "publisher_burst": 10, + "enable_per_queue_rate_limit": false, + "per_queue_rate": 50, + "per_queue_burst": 5 + } } diff --git a/mq.go b/mq.go index 25d2231..03594d6 100644 --- a/mq.go +++ b/mq.go @@ -745,52 +745,104 @@ func (b *Broker) Start(ctx context.Context) error { if b.opts.tlsConfig.UseTLS { cert, err := tls.LoadX509KeyPair(b.opts.tlsConfig.CertPath, b.opts.tlsConfig.KeyPath) if err != nil { - return fmt.Errorf("failed to load TLS certificates: %v", err) + return WrapError(err, "failed to load TLS certificates for broker", "BROKER_TLS_CERT_ERROR") } tlsConfig := &tls.Config{ Certificates: []tls.Certificate{cert}, } listener, err = tls.Listen("tcp", b.opts.brokerAddr, tlsConfig) if err != nil { - return fmt.Errorf("failed to start TLS listener: %v", err) + return WrapError(err, "TLS broker failed to listen on "+b.opts.brokerAddr, "BROKER_TLS_LISTEN_ERROR") } - log.Println("BROKER - RUNNING_TLS ~> started on", b.opts.brokerAddr) } else { listener, err = net.Listen("tcp", b.opts.brokerAddr) if err != nil { - return fmt.Errorf("failed to start TCP listener: %v", err) + return WrapError(err, "broker failed to listen on "+b.opts.brokerAddr, "BROKER_LISTEN_ERROR") } - log.Println("BROKER - RUNNING ~> started on", b.opts.brokerAddr) } b.listener = listener defer b.Close() const maxConcurrentConnections = 100 sem := make(chan struct{}, maxConcurrentConnections) for { - conn, err := listener.Accept() - if err != nil { - b.OnError(ctx, conn, err) - time.Sleep(50 * time.Millisecond) - continue - } - sem <- struct{}{} - go func(c net.Conn) { - defer func() { - <-sem - c.Close() - }() - for { - err := b.readMessage(ctx, c) - if err != nil { - if netErr, ok := err.(net.Error); ok && netErr.Temporary() { - log.Println("Temporary network error, retrying:", netErr) - continue - } - log.Println("Connection closed due to error:", err) - break + select { + case <-ctx.Done(): + log.Printf("BROKER - Shutdown signal received") + return ctx.Err() + default: + conn, err := listener.Accept() + if err != nil { + if atomic.LoadInt32(&b.isShutdown) == 1 { + return nil } + log.Printf("BROKER - Error accepting connection: %v", err) + continue } - }(conn) + + // Configure connection for broker-consumer communication with NO timeouts + if tcpConn, ok := conn.(*net.TCPConn); ok { + // Enable TCP keep-alive for all connections + tcpConn.SetKeepAlive(true) + tcpConn.SetKeepAlivePeriod(30 * time.Second) + + // NEVER set any deadlines for broker-consumer connections + // These connections must remain open indefinitely for persistent communication + // DO NOT call: tcpConn.SetReadDeadline() or tcpConn.SetWriteDeadline() + + log.Printf("BROKER - TCP keep-alive enabled for connection from %s (NO timeouts)", conn.RemoteAddr()) + } + + sem <- struct{}{} + go func() { + defer func() { <-sem }() + defer conn.Close() + b.handleConnection(ctx, conn) + }() + } + } +} + +// handleConnection handles a single connection with NO timeouts for persistent broker-consumer communication +func (b *Broker) handleConnection(ctx context.Context, conn net.Conn) { + defer func() { + if r := recover(); r != nil { + b.logger.Error("Connection handler panic", + logger.Field{Key: "panic", Value: fmt.Sprintf("%v", r)}, + logger.Field{Key: "remote_addr", Value: conn.RemoteAddr().String()}) + } + conn.Close() + }() + + // CRITICAL: Never set any timeouts on broker-consumer connections + // These connections must remain open indefinitely for persistent communication + + for { + select { + case <-ctx.Done(): + b.logger.Debug("Context cancelled, closing connection", + logger.Field{Key: "remote_addr", Value: conn.RemoteAddr().String()}) + return + default: + // Read message WITHOUT any timeout - this is crucial for persistent connections + if err := b.readMessage(ctx, conn); err != nil { + if err.Error() == "EOF" || strings.Contains(err.Error(), "closed network connection") { + b.logger.Debug("Connection closed by client", + logger.Field{Key: "remote_addr", Value: conn.RemoteAddr().String()}) + return + } + // Don't return on timeout errors - they should not occur since we don't set timeouts + if strings.Contains(err.Error(), "timeout") { + b.logger.Warn("Unexpected timeout on connection (should not happen)", + logger.Field{Key: "remote_addr", Value: conn.RemoteAddr().String()}, + logger.Field{Key: "error", Value: err.Error()}) + continue + } + b.logger.Error("Connection error", + logger.Field{Key: "remote_addr", Value: conn.RemoteAddr().String()}, + logger.Field{Key: "error", Value: err.Error()}) + return + } + } } } @@ -1500,7 +1552,9 @@ func (b *Broker) startEnhancedBroker(ctx context.Context) error { func (b *Broker) handleEnhancedConnection(ctx context.Context, conn net.Conn) { defer func() { if r := recover(); r != nil { - b.logger.Error("Connection handler panic", logger.Field{Key: "panic", Value: fmt.Sprintf("%v", r)}) + b.logger.Error("Connection handler panic", + logger.Field{Key: "panic", Value: fmt.Sprintf("%v", r)}, + logger.Field{Key: "remote_addr", Value: conn.RemoteAddr().String()}) } conn.Close() }()