Files
mq/codec
2025-09-24 16:01:07 +05:45
..
2025-09-24 16:01:07 +05:45
2025-07-31 09:31:28 +05:45
2025-07-31 09:31:28 +05:45
2025-07-31 09:31:28 +05:45
2025-07-31 09:31:28 +05:45

Message Queue Codec

This package provides a robust, production-ready codec implementation for serializing, transmitting, and deserializing messages in a distributed messaging system.

Features

  • Message Validation: Comprehensive validation of message format and content
  • Efficient Serialization: Pluggable serialization with JSON as default
  • Compression: Optional payload compression for large messages
  • Encryption: Optional message encryption for sensitive data
  • Large Message Support: Automatic fragmentation and reassembly of large messages
  • Connection Health: Heartbeat mechanism for connection monitoring
  • Performance Optimized: Buffer pooling, efficient memory usage
  • Robust Error Handling: Detailed error types and error wrapping
  • Timeout Management: Context-aware deadline handling
  • Observability: Built-in statistics tracking

Usage

Basic Message Sending/Receiving

// Create a message
msg, err := codec.NewMessage(consts.CmdPublish, payload, "my-queue", headers)
if err != nil {
    log.Fatalf("Failed to create message: %v", err)
}

// Send the message
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

codec := codec.NewCodec(codec.DefaultConfig())
if err := codec.SendMessage(ctx, conn, msg); err != nil {
    log.Fatalf("Failed to send message: %v", err)
}

// Receive a message
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

receivedMsg, err := codec.ReadMessage(ctx, conn)
if err != nil {
    log.Fatalf("Failed to receive message: %v", err)
}

Custom Serialization

// Set a custom marshaller/unmarshaller
codec.SetMarshaller(func(v any) ([]byte, error) {
    // Custom serialization logic
    return someCustomSerializer.Marshal(v)
})

codec.SetUnmarshaller(func(data []byte, v any) error {
    // Custom deserialization logic
    return someCustomSerializer.Unmarshal(data, v)
})

Enabling Compression

// Enable compression globally
codec.EnableCompression(true)

// Or configure it per codec instance
config := codec.DefaultConfig()
config.EnableCompression = true
codec := codec.NewCodec(config)

Enabling Encryption

// Generate a secure key
key := make([]byte, 32) // 256-bit key
if _, err := rand.Read(key); err != nil {
    log.Fatalf("Failed to generate key: %v", err)
}

// Enable encryption globally
codec.EnableEncryption(true, key)

// Or configure it per serialization manager
config := codec.DefaultSerializationConfig()
config.EnableEncryption = true
config.EncryptionKey = key
config.PreferredCipher = "chacha20poly1305" // or "aes-gcm"

Connection Health Monitoring

// Create a heartbeat manager
codec := codec.NewCodec(codec.DefaultConfig())
hm := codec.NewHeartbeatManager(codec, conn)

// Configure heartbeat
hm.SetInterval(15 * time.Second)
hm.SetTimeout(45 * time.Second)
hm.SetOnFailure(func(err error) {
    log.Printf("Heartbeat failure: %v", err)
    // Take action like closing connection
})

// Start heartbeat monitoring
hm.Start()
defer hm.Stop()

Configuration

The codec behavior can be customized through the Config struct:

config := &codec.Config{
    MaxMessageSize:    32 * 1024 * 1024, // 32MB max message size
    MaxHeaderSize:     64 * 1024,       // 64KB max header size
    MaxQueueLength:    128,             // Max queue name length
    ReadTimeout:       15 * time.Second,
    WriteTimeout:      10 * time.Second,
    EnableCompression: true,
    BufferPoolSize:    2000,
}

codec := codec.NewCodec(config)

Error Handling

The codec provides detailed error types for different failure scenarios:

  • ErrMessageTooLarge: Message exceeds maximum size
  • ErrInvalidMessage: Invalid message format
  • ErrInvalidQueue: Invalid queue name
  • ErrInvalidCommand: Invalid command
  • ErrConnectionClosed: Connection closed
  • ErrTimeout: Operation timeout
  • ErrProtocolMismatch: Protocol version mismatch
  • ErrFragmentationRequired: Message requires fragmentation
  • ErrInvalidFragment: Invalid message fragment
  • ErrFragmentTimeout: Timed out waiting for fragments
  • ErrFragmentMissing: Missing fragments in sequence

Error handling example:

if err := codec.SendMessage(ctx, conn, msg); err != nil {
    if errors.Is(err, codec.ErrMessageTooLarge) {
        // Handle message size error
    } else if errors.Is(err, codec.ErrTimeout) {
        // Handle timeout error
    } else {
        // Handle other errors
    }
}

Testing

The codec package includes testing utilities for validation and performance testing:

ts := codec.NewCodecTestSuite()

// Test basic message sending/receiving
msg, _ := codec.NewMessage(consts.CmdPublish, []byte("test"), "test-queue", nil)
if err := ts.SendReceiveTest(msg); err != nil {
    log.Fatalf("Test failed: %v", err)
}

// Test fragmentation/reassembly
largePayload := make([]byte, 20*1024*1024) // 20MB payload
rand.Read(largePayload)                   // Fill with random data
if err := ts.FragmentationTest(largePayload); err != nil {
    log.Fatalf("Fragmentation test failed: %v", err)
}