mirror of
https://github.com/oarkflow/mq.git
synced 2025-10-17 02:10:38 +08:00
update
This commit is contained in:
@@ -252,7 +252,7 @@ func (c *Codec) SendMessage(ctx context.Context, conn net.Conn, msg *Message) er
|
|||||||
return c.sendRawMessage(ctx, conn, msg)
|
return c.sendRawMessage(ctx, conn, msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
// sendRawMessage handles the actual sending of a message or fragment WITHOUT timeouts
|
// sendRawMessage handles the actual sending of a message or fragment WITHOUT any timeouts
|
||||||
func (c *Codec) sendRawMessage(ctx context.Context, conn net.Conn, msg *Message) error {
|
func (c *Codec) sendRawMessage(ctx context.Context, conn net.Conn, msg *Message) error {
|
||||||
// Serialize message
|
// Serialize message
|
||||||
data, err := msg.Serialize()
|
data, err := msg.Serialize()
|
||||||
@@ -283,21 +283,9 @@ func (c *Codec) sendRawMessage(ctx context.Context, conn net.Conn, msg *Message)
|
|||||||
binary.BigEndian.PutUint32(buffer.B[:4], uint32(len(data)))
|
binary.BigEndian.PutUint32(buffer.B[:4], uint32(len(data)))
|
||||||
copy(buffer.B[4:], data)
|
copy(buffer.B[4:], data)
|
||||||
|
|
||||||
// CRITICAL: DO NOT set any write deadlines for broker-consumer connections
|
// CRITICAL: NEVER set any write deadlines for broker-consumer connections
|
||||||
// These connections must remain open indefinitely for persistent communication
|
// These connections must remain open indefinitely for persistent communication
|
||||||
// Only set timeout if explicitly configured AND not zero (for backward compatibility)
|
// Completely removed all timeout/deadline logic to prevent I/O timeouts
|
||||||
if c.config.WriteTimeout > 0 {
|
|
||||||
deadline := time.Now().Add(c.config.WriteTimeout)
|
|
||||||
if ctxDeadline, ok := ctx.Deadline(); ok && ctxDeadline.Before(deadline) {
|
|
||||||
deadline = ctxDeadline
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := conn.SetWriteDeadline(deadline); err != nil {
|
|
||||||
c.incrementErrors()
|
|
||||||
return fmt.Errorf("failed to set write deadline: %w", err)
|
|
||||||
}
|
|
||||||
defer conn.SetWriteDeadline(time.Time{})
|
|
||||||
}
|
|
||||||
|
|
||||||
// Write with buffering
|
// Write with buffering
|
||||||
writer := bufio.NewWriter(conn)
|
writer := bufio.NewWriter(conn)
|
||||||
@@ -322,7 +310,7 @@ func (c *Codec) sendRawMessage(ctx context.Context, conn net.Conn, msg *Message)
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReadMessage reads a message WITHOUT timeouts for persistent broker-consumer connections
|
// ReadMessage reads a message WITHOUT any timeouts for persistent broker-consumer connections
|
||||||
func (c *Codec) ReadMessage(ctx context.Context, conn net.Conn) (*Message, error) {
|
func (c *Codec) ReadMessage(ctx context.Context, conn net.Conn) (*Message, error) {
|
||||||
// Check context cancellation before proceeding
|
// Check context cancellation before proceeding
|
||||||
if err := ctx.Err(); err != nil {
|
if err := ctx.Err(); err != nil {
|
||||||
@@ -330,21 +318,9 @@ func (c *Codec) ReadMessage(ctx context.Context, conn net.Conn) (*Message, error
|
|||||||
return nil, fmt.Errorf("context ended before read: %w", err)
|
return nil, fmt.Errorf("context ended before read: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// CRITICAL: DO NOT set any read deadlines for broker-consumer connections
|
// CRITICAL: NEVER set any read deadlines for broker-consumer connections
|
||||||
// These connections must remain open indefinitely for persistent communication
|
// These connections must remain open indefinitely for persistent communication
|
||||||
// Only set timeout if explicitly configured AND not zero (for backward compatibility)
|
// Completely removed all timeout/deadline logic to prevent I/O timeouts
|
||||||
if c.config.ReadTimeout > 0 {
|
|
||||||
deadline := time.Now().Add(c.config.ReadTimeout)
|
|
||||||
if ctxDeadline, ok := ctx.Deadline(); ok && ctxDeadline.Before(deadline) {
|
|
||||||
deadline = ctxDeadline
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := conn.SetReadDeadline(deadline); err != nil {
|
|
||||||
c.incrementErrors()
|
|
||||||
return nil, fmt.Errorf("failed to set read deadline: %w", err)
|
|
||||||
}
|
|
||||||
defer conn.SetReadDeadline(time.Time{})
|
|
||||||
}
|
|
||||||
|
|
||||||
// Read length prefix
|
// Read length prefix
|
||||||
lengthBytes := make([]byte, 4)
|
lengthBytes := make([]byte, 4)
|
||||||
|
32
consumer.go
32
consumer.go
@@ -434,7 +434,7 @@ func (c *Consumer) sendDenyMessage(ctx context.Context, taskID, queue string, er
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
// isHealthy checks if the connection is still healthy
|
// isHealthy checks if the connection is still healthy WITHOUT setting deadlines
|
||||||
func (c *Consumer) isHealthy() bool {
|
func (c *Consumer) isHealthy() bool {
|
||||||
c.connMutex.RLock()
|
c.connMutex.RLock()
|
||||||
defer c.connMutex.RUnlock()
|
defer c.connMutex.RUnlock()
|
||||||
@@ -443,27 +443,25 @@ func (c *Consumer) isHealthy() bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
// Simple health check by setting read deadline
|
// CRITICAL: DO NOT set any deadlines on broker-consumer connections
|
||||||
c.conn.SetReadDeadline(time.Now().Add(100 * time.Millisecond))
|
// These are persistent connections that must remain open indefinitely
|
||||||
defer c.conn.SetReadDeadline(time.Time{})
|
// Instead, use a simple non-blocking connection state check
|
||||||
|
|
||||||
one := make([]byte, 1)
|
// Check if connection is still valid by checking the connection state
|
||||||
n, err := c.conn.Read(one)
|
// without setting any timeouts or deadlines
|
||||||
|
if tcpConn, ok := c.conn.(*net.TCPConn); ok {
|
||||||
if err != nil {
|
// Check TCP connection state without timeouts
|
||||||
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
|
// This is a lightweight check that doesn't interfere with persistent connection
|
||||||
return true // Timeout is expected for health check
|
if tcpConn == nil {
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
return false
|
// Connection exists and is of correct type - assume healthy
|
||||||
}
|
// The actual health will be determined when we try to read/write
|
||||||
|
|
||||||
// If we read data, put it back (this shouldn't happen in health check)
|
|
||||||
if n > 0 {
|
|
||||||
// This is a simplified health check; in production, you might want to buffer this
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
return true
|
// For non-TCP connections, assume healthy if connection exists
|
||||||
|
return c.conn != nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// startHealthChecker starts periodic health checks
|
// startHealthChecker starts periodic health checks
|
||||||
|
Reference in New Issue
Block a user