mirror of
https://github.com/oarkflow/mq.git
synced 2025-09-26 20:11:16 +08:00
Message Queue Codec
This package provides a robust, production-ready codec implementation for serializing, transmitting, and deserializing messages in a distributed messaging system.
Features
- Message Validation: Comprehensive validation of message format and content
- Efficient Serialization: Pluggable serialization with JSON as default
- Compression: Optional payload compression for large messages
- Encryption: Optional message encryption for sensitive data
- Large Message Support: Automatic fragmentation and reassembly of large messages
- Connection Health: Heartbeat mechanism for connection monitoring
- Performance Optimized: Buffer pooling, efficient memory usage
- Robust Error Handling: Detailed error types and error wrapping
- Timeout Management: Context-aware deadline handling
- Observability: Built-in statistics tracking
Usage
Basic Message Sending/Receiving
// Create a message
msg, err := codec.NewMessage(consts.CmdPublish, payload, "my-queue", headers)
if err != nil {
log.Fatalf("Failed to create message: %v", err)
}
// Send the message
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
codec := codec.NewCodec(codec.DefaultConfig())
if err := codec.SendMessage(ctx, conn, msg); err != nil {
log.Fatalf("Failed to send message: %v", err)
}
// Receive a message
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
receivedMsg, err := codec.ReadMessage(ctx, conn)
if err != nil {
log.Fatalf("Failed to receive message: %v", err)
}
Custom Serialization
// Set a custom marshaller/unmarshaller
codec.SetMarshaller(func(v any) ([]byte, error) {
// Custom serialization logic
return someCustomSerializer.Marshal(v)
})
codec.SetUnmarshaller(func(data []byte, v any) error {
// Custom deserialization logic
return someCustomSerializer.Unmarshal(data, v)
})
Enabling Compression
// Enable compression globally
codec.EnableCompression(true)
// Or configure it per codec instance
config := codec.DefaultConfig()
config.EnableCompression = true
codec := codec.NewCodec(config)
Enabling Encryption
// Generate a secure key
key := make([]byte, 32) // 256-bit key
if _, err := rand.Read(key); err != nil {
log.Fatalf("Failed to generate key: %v", err)
}
// Enable encryption globally
codec.EnableEncryption(true, key)
// Or configure it per serialization manager
config := codec.DefaultSerializationConfig()
config.EnableEncryption = true
config.EncryptionKey = key
config.PreferredCipher = "chacha20poly1305" // or "aes-gcm"
Connection Health Monitoring
// Create a heartbeat manager
codec := codec.NewCodec(codec.DefaultConfig())
hm := codec.NewHeartbeatManager(codec, conn)
// Configure heartbeat
hm.SetInterval(15 * time.Second)
hm.SetTimeout(45 * time.Second)
hm.SetOnFailure(func(err error) {
log.Printf("Heartbeat failure: %v", err)
// Take action like closing connection
})
// Start heartbeat monitoring
hm.Start()
defer hm.Stop()
Configuration
The codec behavior can be customized through the Config
struct:
config := &codec.Config{
MaxMessageSize: 32 * 1024 * 1024, // 32MB max message size
MaxHeaderSize: 64 * 1024, // 64KB max header size
MaxQueueLength: 128, // Max queue name length
ReadTimeout: 15 * time.Second,
WriteTimeout: 10 * time.Second,
EnableCompression: true,
BufferPoolSize: 2000,
}
codec := codec.NewCodec(config)
Error Handling
The codec provides detailed error types for different failure scenarios:
ErrMessageTooLarge
: Message exceeds maximum sizeErrInvalidMessage
: Invalid message formatErrInvalidQueue
: Invalid queue nameErrInvalidCommand
: Invalid commandErrConnectionClosed
: Connection closedErrTimeout
: Operation timeoutErrProtocolMismatch
: Protocol version mismatchErrFragmentationRequired
: Message requires fragmentationErrInvalidFragment
: Invalid message fragmentErrFragmentTimeout
: Timed out waiting for fragmentsErrFragmentMissing
: Missing fragments in sequence
Error handling example:
if err := codec.SendMessage(ctx, conn, msg); err != nil {
if errors.Is(err, codec.ErrMessageTooLarge) {
// Handle message size error
} else if errors.Is(err, codec.ErrTimeout) {
// Handle timeout error
} else {
// Handle other errors
}
}
Testing
The codec package includes testing utilities for validation and performance testing:
ts := codec.NewCodecTestSuite()
// Test basic message sending/receiving
msg, _ := codec.NewMessage(consts.CmdPublish, []byte("test"), "test-queue", nil)
if err := ts.SendReceiveTest(msg); err != nil {
log.Fatalf("Test failed: %v", err)
}
// Test fragmentation/reassembly
largePayload := make([]byte, 20*1024*1024) // 20MB payload
rand.Read(largePayload) // Fill with random data
if err := ts.FragmentationTest(largePayload); err != nil {
log.Fatalf("Fragmentation test failed: %v", err)
}