From a87eeb8a3032d9787ffcd334f1a99ffee1e271e9 Mon Sep 17 00:00:00 2001 From: langhuihui <178529795@qq.com> Date: Tue, 14 Oct 2025 10:44:21 +0800 Subject: [PATCH] fix: update gotask version and update bufreader doc --- doc/bufreader_analysis.md | 1469 +++++++++++++-------------------- doc_CN/bufreader_analysis.md | 1471 +++++++++++++--------------------- go.mod | 8 +- go.sum | 9 +- 4 files changed, 1133 insertions(+), 1824 deletions(-) diff --git a/doc/bufreader_analysis.md b/doc/bufreader_analysis.md index 1e92b71..0245dcb 100644 --- a/doc/bufreader_analysis.md +++ b/doc/bufreader_analysis.md @@ -1,1038 +1,697 @@ -# BufReader: Zero-Copy Network Reading with Advanced Memory Management +# BufReader: Zero-Copy Network Reading with Non-Contiguous Memory Buffers ## Table of Contents -- [1. Memory Allocation Issues in Standard Library bufio.Reader](#1-memory-allocation-issues-in-standard-library-bufioreader) -- [2. BufReader: A Zero-Copy Solution](#2-bufreader-a-zero-copy-solution) -- [3. Performance Benchmarks](#3-performance-benchmarks) -- [4. Real-World Use Cases](#4-real-world-use-cases) -- [5. Best Practices](#5-best-practices) -- [6. Performance Optimization Tips](#6-performance-optimization-tips) -- [7. Summary](#7-summary) +- [1. Problem: Traditional Contiguous Memory Buffer Bottlenecks](#1-problem-traditional-contiguous-memory-buffer-bottlenecks) +- [2. Core Solution: Non-Contiguous Memory Buffer Passing Mechanism](#2-core-solution-non-contiguous-memory-buffer-passing-mechanism) +- [3. Performance Validation](#3-performance-validation) +- [4. Usage Guide](#4-usage-guide) ## TL;DR (Key Takeaways) -If you're short on time, here are the most important conclusions: +**Core Innovation**: Non-Contiguous Memory Buffer Passing Mechanism +- Data stored as **chained memory blocks**, non-contiguous layout +- Pass references via **ReadRange callback**, zero-copy +- Memory blocks **reused from object pool**, avoiding allocation and GC -**BufReader's Core Advantages** (Concurrent Scenarios): -- ⭐ **98.5% GC Reduction**: 134 GCs → 2 GCs (streaming server scenario) -- 🚀 **99.93% Less Allocations**: 5.57 million → 3,918 allocations -- 🔄 **10-20x Throughput Improvement**: Zero allocation + memory reuse - -**Key Data**: +**Performance Data** (Streaming server, 100 concurrent streams): ``` -Streaming Server Scenario (100 concurrent streams): -bufio.Reader: 79 GB allocated, 134 GCs -BufReader: 0.6 GB allocated, 2 GCs +bufio.Reader: 79 GB allocated, 134 GCs, 374.6 ns/op +BufReader: 0.6 GB allocated, 2 GCs, 30.29 ns/op + +Result: 98.5% GC reduction, 11.6x throughput improvement ``` -**Ideal Use Cases**: -- ✅ High-concurrency network servers -- ✅ Streaming media processing -- ✅ Long-running services (24/7) - -**Quick Test**: -```bash -sh scripts/benchmark_bufreader.sh -``` +**Ideal For**: High-concurrency network servers, streaming media, long-running services --- -## Introduction +## 1. Problem: Traditional Contiguous Memory Buffer Bottlenecks -In high-performance network programming, frequent memory allocation and copying are major sources of performance bottlenecks. While Go's standard library `bufio.Reader` provides buffered reading capabilities, it still involves significant memory allocation and copying operations when processing network data streams. This article provides an in-depth analysis of these issues and introduces `BufReader` from the Monibuca project, demonstrating how to achieve zero-copy, high-performance network data reading through the GoMem memory allocator. +### 1.1 bufio.Reader's Contiguous Memory Model -## 1. Memory Allocation Issues in Standard Library bufio.Reader - -### 1.1 How bufio.Reader Works - -`bufio.Reader` uses a fixed-size internal buffer to reduce system call frequency: +The standard library `bufio.Reader` uses a **fixed-size contiguous memory buffer**: ```go type Reader struct { - buf []byte // Fixed-size buffer - rd io.Reader // Underlying reader - r, w int // Read/write positions + buf []byte // Single contiguous buffer (e.g., 4KB) + r, w int // Read/write pointers } func (b *Reader) Read(p []byte) (n int, err error) { - // 1. If buffer is empty, read data from underlying reader to fill buffer - if b.r == b.w { - n, err = b.rd.Read(b.buf) // Data copied to internal buffer - b.w += n - } - - // 2. Copy data from buffer to target slice - n = copy(p, b.buf[b.r:b.w]) // Another data copy - b.r += n + // Copy from contiguous buffer to target + n = copy(p, b.buf[b.r:b.w]) // Must copy return } ``` -### 1.2 Memory Allocation Problem Analysis +**Cost of Contiguous Memory**: -When using `bufio.Reader` to read network data, the following issues exist: +``` +Reading 16KB data (with 4KB buffer): -**Issue 1: Multiple Memory Copies** +Network → bufio buffer → User buffer + ↓ (4KB contiguous) ↓ +1st [████] → Copy to result[0:4KB] +2nd [████] → Copy to result[4KB:8KB] +3rd [████] → Copy to result[8KB:12KB] +4th [████] → Copy to result[12KB:16KB] -```mermaid -sequenceDiagram - participant N as Network Socket - participant B as bufio.Reader Internal Buffer - participant U as User Buffer - participant A as Application Layer - - N->>B: System call reads data (1st copy) - Note over B: Data stored in fixed buffer - B->>U: copy() to user buffer (2nd copy) - Note over U: User gets data copy - U->>A: Pass to application layer (possible 3rd copy) - Note over A: Application processes data +Total: 4 network reads + 4 memory copies +Allocates result (16KB contiguous memory) ``` -Each read operation requires at least two memory copies: -1. From network socket to `bufio.Reader`'s internal buffer -2. From internal buffer to user-provided slice +### 1.2 Issues in High-Concurrency Scenarios -**Issue 2: Fixed Buffer Limitations** +In streaming servers (100 concurrent connections, 30fps each): ```go -// bufio.Reader uses fixed-size buffer -reader := bufio.NewReaderSize(conn, 4096) // Fixed 4KB - -// Reading large chunks requires multiple operations -data := make([]byte, 16384) // Need to read 16KB -for total := 0; total < 16384; { - n, err := reader.Read(data[total:]) // Need to loop 4 times - total += n -} -``` - -**Issue 3: Frequent Memory Allocation** - -```go -// Each read requires allocating new slices -func processPackets(reader *bufio.Reader) { +// Typical processing pattern +func handleStream(conn net.Conn) { + reader := bufio.NewReaderSize(conn, 4096) for { - // Allocate new memory for each packet - header := make([]byte, 4) // Allocation 1 - reader.Read(header) + // Allocate contiguous buffer for each packet + packet := make([]byte, 1024) // Allocation 1 + n, _ := reader.Read(packet) // Copy 1 - size := binary.BigEndian.Uint32(header) - payload := make([]byte, size) // Allocation 2 - reader.Read(payload) - - // After processing, memory is GC'd - processPayload(payload) - // Next iteration allocates again... + // Forward to multiple subscribers + for _, sub := range subscribers { + data := make([]byte, n) // Allocations 2-N + copy(data, packet[:n]) // Copies 2-N + sub.Write(data) + } } } + +// Performance impact: +// 100 connections × 30fps × (1 + subscribers) allocations = massive temporary memory +// Triggers frequent GC, system instability ``` -### 1.3 Performance Impact +**Core Problems**: +1. Must maintain contiguous memory layout → Frequent copying +2. Allocate new buffer for each packet → Massive temporary objects +3. Forwarding requires multiple copies → CPU wasted on memory operations -In high-frequency network data processing scenarios, these issues lead to: - -1. **Increased CPU Overhead**: Frequent `copy()` operations consume CPU resources -2. **Higher GC Pressure**: Massive temporary memory allocations increase garbage collection burden -3. **Increased Latency**: Each memory allocation and copy adds processing latency -4. **Reduced Throughput**: Memory operations become bottlenecks, limiting overall throughput - -## 2. BufReader: A Zero-Copy Solution +## 2. Core Solution: Non-Contiguous Memory Buffer Passing Mechanism ### 2.1 Design Philosophy -`BufReader` is designed based on the following core principles: +BufReader uses **non-contiguous memory block chains**: -1. **Zero-Copy Reading**: Read directly from network to final memory location, avoiding intermediate copies -2. **Memory Reuse**: Reuse memory blocks through GoMem allocator, avoiding frequent allocations -3. **Chained Buffering**: Use multiple memory blocks in a linked list instead of a single fixed buffer -4. **On-Demand Allocation**: Dynamically adjust memory usage based on actual read amount +``` +No longer require data in contiguous memory: +1. Data scattered across multiple memory blocks (linked list) +2. Each block independently managed and reused +3. Pass by reference, no data copying +``` -### 2.2 Core Data Structures +**Core Data Structures**: ```go type BufReader struct { - Allocator *ScalableMemoryAllocator // Scalable memory allocator - buf MemoryReader // Memory block chain reader - totalRead int // Total bytes read - BufLen int // Block size per read - Mouth chan []byte // Data input channel - feedData func() error // Data feeding function + Allocator *ScalableMemoryAllocator // Object pool allocator + buf MemoryReader // Memory block chain } -// MemoryReader manages multiple memory blocks type MemoryReader struct { - *Memory // Memory manager - Buffers [][]byte // Memory block chain - Size int // Total size - Length int // Readable length + Buffers [][]byte // Multiple memory blocks, non-contiguous! + Size int // Total size + Length int // Readable length } ``` -### 2.3 Workflow +### 2.2 Non-Contiguous Memory Buffer Model -#### 2.3.1 Zero-Copy Data Reading Flow +#### Contiguous vs Non-Contiguous Comparison + +``` +bufio.Reader (Contiguous Memory): +┌─────────────────────────────────┐ +│ 4KB Fixed Buffer │ +│ [Read][Available] │ +└─────────────────────────────────┘ +- Must copy to contiguous target buffer +- Fixed size limitation +- Read portion wastes space + +BufReader (Non-Contiguous Memory): +┌──────┐ ┌──────┐ ┌────────┐ ┌──────┐ +│Block1│→│Block2│→│ Block3 │→│Block4│ +│ 512B │ │ 1KB │ │ 2KB │ │ 3KB │ +└──────┘ └──────┘ └────────┘ └──────┘ +- Directly pass reference to each block (zero-copy) +- Flexible block sizes +- Recycle immediately after processing +``` + +#### Memory Block Chain Workflow ```mermaid sequenceDiagram - participant N as Network Socket - participant A as ScalableMemoryAllocator + participant N as Network + participant P as Object Pool participant B as BufReader.buf participant U as User Code - U->>B: Read(n) - B->>B: Check if buffer has data - alt Buffer empty - B->>A: Request memory block - Note over A: Get from pool or allocate new block - A-->>B: Return memory block reference - B->>N: Read directly to memory block - Note over N,B: Zero-copy: data written to final location - end - B-->>U: Return slice view of memory block - Note over U: User uses directly, no copy needed - U->>U: Process data - U->>A: Recycle memory block (optional) - Note over A: Block returns to pool for reuse + N->>P: 1st read (returns 512B) + P-->>B: Block1 (512B) - from pool or new + B->>B: Buffers = [Block1] + + N->>P: 2nd read (returns 1KB) + P-->>B: Block2 (1KB) - reused from pool + B->>B: Buffers = [Block1, Block2] + + N->>P: 3rd read (returns 2KB) + P-->>B: Block3 (2KB) + B->>B: Buffers = [Block1, Block2, Block3] + + U->>B: ReadRange(4096) + B->>U: yield(Block1) - pass reference + B->>U: yield(Block2) - pass reference + B->>U: yield(Block3) - pass reference + B->>U: yield(Block4[0:512]) + + U->>B: Processing complete + B->>P: Recycle Block1, Block2, Block3, Block4 + Note over P: Memory blocks return to pool for reuse ``` -#### 2.3.2 Memory Block Management Flow +### 2.3 Zero-Copy Passing: ReadRange API -```mermaid -graph TD - A[Start Reading] --> B{buf has data?} - B -->|Yes| C[Return data view directly] - B -->|No| D[Call feedData] - D --> E[Allocator.Read requests memory] - E --> F{Pool has free block?} - F -->|Yes| G[Reuse existing memory block] - F -->|No| H[Allocate new memory block] - G --> I[Read data from network] - H --> I - I --> J[Append to buf.Buffers] - J --> K[Update Size and Length] - K --> C - C --> L[User reads data] - L --> M{Data processed?} - M -->|Yes| N[ClipFront recycle front blocks] - N --> O[Allocator.Free return to pool] - O --> P[End] - M -->|No| A -``` - -### 2.4 Core Implementation Analysis - -#### 2.4.1 Initialization and Memory Allocation +**Core API**: ```go -func NewBufReader(reader io.Reader) *BufReader { - return NewBufReaderWithBufLen(reader, defaultBufSize) -} +func (r *BufReader) ReadRange(n int, yield func([]byte)) error +``` -func NewBufReaderWithBufLen(reader io.Reader, bufLen int) *BufReader { - r := &BufReader{ - Allocator: NewScalableMemoryAllocator(bufLen), // Create allocator - BufLen: bufLen, +**How It Works**: + +```go +// Internal implementation (simplified) +func (r *BufReader) ReadRange(n int, yield func([]byte)) error { + remaining := n + + // Iterate through memory block chain + for _, block := range r.buf.Buffers { + if remaining <= 0 { + break + } + + if len(block) <= remaining { + // Pass entire block + yield(block) // Zero-copy: pass reference directly! + remaining -= len(block) + } else { + // Pass portion + yield(block[:remaining]) + remaining = 0 + } + } + + // Recycle processed blocks + r.recycleFront() + return nil +} +``` + +**Usage Example**: + +```go +// Read 4096 bytes of data +reader.ReadRange(4096, func(chunk []byte) { + // chunk is reference to original memory block + // May be called multiple times with different sized blocks + // e.g.: 512B, 1KB, 2KB, 512B + + processData(chunk) // Process directly, zero-copy! +}) + +// Characteristics: +// - No need to allocate target buffer +// - No need to copy data +// - Each chunk automatically recycled after processing +``` + +### 2.4 Advantages in Real Network Scenarios + +**Scenario: Read 10KB from network, each read returns 500B-2KB** + +``` +bufio.Reader (Contiguous Memory): +1. Read 2KB to internal buffer (contiguous) +2. Copy 2KB to user buffer ← Copy +3. Read 1.5KB to internal buffer +4. Copy 1.5KB to user buffer ← Copy +5. Read 2KB... +6. Copy 2KB... ← Copy +... Repeat ... +Total: Multiple network reads + Multiple memory copies +Must allocate 10KB contiguous buffer + +BufReader (Non-Contiguous Memory): +1. Read 2KB → Block1, append to chain +2. Read 1.5KB → Block2, append to chain +3. Read 2KB → Block3, append to chain +4. Read 2KB → Block4, append to chain +5. Read 2.5KB → Block5, append to chain +6. ReadRange(10KB): + → yield(Block1) - 2KB + → yield(Block2) - 1.5KB + → yield(Block3) - 2KB + → yield(Block4) - 2KB + → yield(Block5) - 2.5KB +Total: Multiple network reads + 0 memory copies +No contiguous memory needed, process block by block +``` + +### 2.5 Real Application: Stream Forwarding + +**Problem Scenario**: 100 concurrent streams, each forwarded to 10 subscribers + +**Traditional Approach** (Contiguous Memory): + +```go +func forwardStream_Traditional(reader *bufio.Reader, subscribers []net.Conn) { + packet := make([]byte, 4096) // Alloc 1: contiguous memory + n, _ := reader.Read(packet) // Copy 1: from bufio buffer + + // Copy for each subscriber + for _, sub := range subscribers { + data := make([]byte, n) // Allocs 2-11: 10 times + copy(data, packet[:n]) // Copies 2-11: 10 times + sub.Write(data) + } +} +// Per packet: 11 allocations + 11 copies +// 100 concurrent × 30fps × 11 = 33,000 allocations/sec +``` + +**BufReader Approach** (Non-Contiguous Memory): + +```go +func forwardStream_BufReader(reader *BufReader, subscribers []net.Conn) { + reader.ReadRange(4096, func(chunk []byte) { + // chunk is original memory block reference, may be non-contiguous + // All subscribers share the same memory block! + + for _, sub := range subscribers { + sub.Write(chunk) // Send reference directly, zero-copy + } + }) +} +// Per packet: 0 allocations + 0 copies +// 100 concurrent × 30fps × 0 = 0 allocations/sec +``` + +**Performance Comparison**: +- Allocations: 33,000/sec → 0/sec +- Memory copies: 33,000/sec → 0/sec +- GC pressure: High → Very low + +### 2.6 Memory Block Lifecycle + +```mermaid +stateDiagram-v2 + [*] --> Get from Pool + Get from Pool --> Read Network Data + Read Network Data --> Append to Chain + Append to Chain --> Pass to User + Pass to User --> User Processing + User Processing --> Recycle to Pool + Recycle to Pool --> Get from Pool + + note right of Get from Pool + Reuse existing blocks + Avoid GC + end note + + note right of Pass to User + Pass reference, zero-copy + May pass to multiple subscribers + end note + + note right of Recycle to Pool + Active recycling + Immediately reusable + end note +``` + +**Key Points**: +1. Memory blocks **circularly reused** in pool, bypassing GC +2. Pass references instead of copying data, achieving zero-copy +3. Recycle immediately after processing, minimizing memory footprint + +### 2.7 Core Code Implementation + +```go +// Create BufReader +func NewBufReader(reader io.Reader) *BufReader { + return &BufReader{ + Allocator: NewScalableMemoryAllocator(16384), // Object pool feedData: func() error { - // Key: Read from allocator, fill directly to memory block + // Get memory block from pool, read network data directly buf, err := r.Allocator.Read(reader, r.BufLen) if err != nil { return err } - n := len(buf) - r.totalRead += n - // Directly append memory block reference, no copy + // Append to chain (only add reference) r.buf.Buffers = append(r.buf.Buffers, buf) - r.buf.Size += n - r.buf.Length += n + r.buf.Length += len(buf) return nil }, } - r.buf.Memory = &Memory{} - return r -} -``` - -**Zero-Copy Key Points**: -- `Allocator.Read()` reads directly from `io.Reader` to allocated memory block -- Returned `buf` is a reference to the actual data storage memory block -- `append(r.buf.Buffers, buf)` only appends reference, no data copy - -#### 2.4.2 Read Operations - -```go -func (r *BufReader) ReadByte() (b byte, err error) { - // If buffer is empty, trigger data filling - for r.buf.Length == 0 { - if err = r.feedData(); err != nil { - return - } - } - // Read from memory block chain, no copy needed - return r.buf.ReadByte() } +// Zero-copy reading func (r *BufReader) ReadRange(n int, yield func([]byte)) error { - for r.recycleFront(); n > 0 && err == nil; err = r.feedData() { - if r.buf.Length > 0 { - if r.buf.Length >= n { - // Directly pass slice view of memory block, no copy - r.buf.RangeN(n, yield) - return - } - n -= r.buf.Length - r.buf.Range(yield) - } + for r.buf.Length < n { + r.feedData() // Read more data from network } - return -} -``` - -**Zero-Copy Benefits**: -- `yield` callback receives a slice view of the memory block -- User code directly operates on original memory blocks without intermediate copying -- After reading, processed blocks are automatically recycled - -#### 2.4.3 Memory Recycling - -```go -func (r *BufReader) recycleFront() { - // Clean up processed memory blocks - r.buf.ClipFront(r.Allocator.Free) + + // Pass references block by block + for _, block := range r.buf.Buffers { + yield(block) // Zero-copy passing + } + + // Recycle processed blocks + r.recycleFront() + return nil } +// Recycle memory blocks to pool func (r *BufReader) Recycle() { - r.buf = MemoryReader{} if r.Allocator != nil { - // Return all memory blocks to allocator - r.Allocator.Recycle() - } - if r.Mouth != nil { - close(r.Mouth) + r.Allocator.Recycle() // Return all blocks to pool } } ``` -### 2.5 Comparison with bufio.Reader +## 3. Performance Validation -```mermaid -graph LR - subgraph "bufio.Reader (Multiple Copies)" - A1[Network] -->|System Call| B1[Kernel Buffer] - B1 -->|Copy 1| C1[bufio Buffer] - C1 -->|Copy 2| D1[User Slice] - D1 -->|Copy 3?| E1[Application] - end - - subgraph "BufReader (Zero-Copy)" - A2[Network] -->|System Call| B2[Kernel Buffer] - B2 -->|Direct Read| C2[GoMem Block] - C2 -->|Slice View| D2[User Code] - D2 -->|Recycle| C2 - C2 -->|Reuse| C2 - end -``` +### 3.1 Test Design -| Feature | bufio.Reader | BufReader | -|---------|-------------|-----------| -| Memory Copies | 2-3 times | 0 times (slice view) | -| Buffer Mode | Fixed-size single buffer | Variable-size chained buffer | -| Memory Allocation | May allocate each read | Object pool reuse | -| Memory Recycling | GC automatic | Active return to pool | -| Large Data Handling | Multiple operations needed | Single append to chain | -| GC Pressure | High | Very low | +**Real Network Simulation**: Each read returns random size (64-2048 bytes), simulating real network fluctuations -## 3. Performance Benchmarks +**Core Test Scenarios**: +1. **Concurrent Network Connection Reading** - Simulate 100+ concurrent connections +2. **GC Pressure Test** - Demonstrate long-term running differences +3. **Streaming Server** - Real business scenario (100 streams × forwarding) -### 3.1 Test Scenario Design +### 3.2 Performance Test Results -#### 3.1.1 Real Network Simulation +**Test Environment**: Apple M2 Pro, Go 1.23.0 -To make benchmarks more realistic, we implemented a `mockNetworkReader` that simulates real network behavior. +#### GC Pressure Test (Core Comparison) -**Real Network Characteristics**: +| Metric | bufio.Reader | BufReader | Improvement | +|--------|-------------|-----------|-------------| +| Operation Latency | 1874 ns/op | 112.7 ns/op | **16.6x faster** | +| Allocation Count | 5,576,659 | 3,918 | **99.93% reduction** | +| Per Operation | 2 allocs/op | 0 allocs/op | **Zero allocation** | +| Throughput | 2.8M ops/s | 45.7M ops/s | **16x improvement** | -In real network reading scenarios, the data length returned by each `Read()` call is **uncertain**, affected by multiple factors: +#### Streaming Server Scenario -- TCP receive window size -- Network latency and bandwidth -- OS buffer state -- Network congestion -- Network quality fluctuations +| Metric | bufio.Reader | BufReader | Improvement | +|--------|-------------|-----------|-------------| +| Operation Latency | 374.6 ns/op | 30.29 ns/op | **12.4x faster** | +| Memory Allocation | 79,508 MB | 601 MB | **99.2% reduction** | +| **GC Runs** | **134** | **2** | **98.5% reduction** ⭐ | +| Throughput | 10.1M ops/s | 117M ops/s | **11.6x improvement** | -**Simulation Implementation**: - -```go -type mockNetworkReader struct { - data []byte - offset int - rng *rand.Rand - minChunk int // Minimum chunk size - maxChunk int // Maximum chunk size -} - -func (m *mockNetworkReader) Read(p []byte) (n int, err error) { - // Each time return random length data between minChunk and maxChunk - chunkSize := m.minChunk + m.rng.Intn(m.maxChunk-m.minChunk+1) - n = copy(p[:chunkSize], m.data[m.offset:]) - m.offset += n - return n, nil -} -``` - -**Different Network Condition Simulations**: - -| Network Condition | Data Block Range | Real Scenario | -|------------------|-----------------|---------------| -| Good Network | 1024-4096 bytes | Stable LAN, premium network | -| Normal Network | 256-2048 bytes | Regular internet connection | -| Poor Network | 64-512 bytes | High latency, small TCP window | -| Worst Network | 1-128 bytes | Mobile network, severe congestion | - -This simulation makes benchmark results more realistic and reliable. - -#### 3.1.2 Test Scenario List - -We focus on the following core scenarios: - -1. **Concurrent Network Connection Reading** - Demonstrates zero allocation -2. **Concurrent Protocol Parsing** - Simulates real applications -3. **GC Pressure Test** - Shows long-term running advantages ⭐ -4. **Streaming Server Scenario** - Real business scenario ⭐ - -### 3.2 Benchmark Design - -#### Core Test Scenarios - -Benchmarks focus on **concurrent network scenarios** and **GC pressure** comparison: - -**1. Concurrent Network Connection Reading** -- Simulates 100+ concurrent connections continuously reading data -- Each read processes 1KB data packets -- bufio: Allocates new buffer each time (`make([]byte, 1024)`) -- BufReader: Zero-copy processing (`ReadRange`) - -**2. Concurrent Protocol Parsing** -- Simulates streaming server parsing protocol packets -- Reads packet header (4 bytes) + data content -- Compares memory allocation strategies - -**3. GC Pressure Test** (⭐ Core) -- Continuous concurrent reading and processing -- Tracks GC count, total memory allocation, allocation count -- Demonstrates differences in long-term running - -**4. Streaming Server Scenario** (⭐ Real Application) -- Simulates 100 concurrent streams -- Each stream reads and forwards data to subscribers -- Complete real application scenario comparison - -#### Key Test Logic - -**Concurrent Reading**: -```go -// bufio.Reader - Allocate each time -buf := make([]byte, 1024) // 1KB allocation -n, _ := reader.Read(buf) -processData(buf[:n]) - -// BufReader - Zero-copy -reader.ReadRange(1024, func(data []byte) { - processData(data) // Direct use, no allocation -}) -``` - -**GC Statistics**: -```go -// Record GC statistics -var beforeGC, afterGC runtime.MemStats -runtime.ReadMemStats(&beforeGC) - -b.RunParallel(func(pb *testing.PB) { - // Concurrent testing... -}) - -runtime.ReadMemStats(&afterGC) -b.ReportMetric(float64(afterGC.NumGC-beforeGC.NumGC), "gc-runs") -b.ReportMetric(float64(afterGC.TotalAlloc-beforeGC.TotalAlloc)/1024/1024, "MB-alloc") -``` - -Complete test code: `pkg/util/buf_reader_benchmark_test.go` - -### 3.3 Running Benchmarks - -We provide complete benchmark code (`pkg/util/buf_reader_benchmark_test.go`) and convenient test scripts. - -#### Method 1: Using Test Script (Recommended) - -```bash -# Run complete benchmark suite -sh scripts/benchmark_bufreader.sh -``` - -This script will run all tests sequentially and output user-friendly results. - -#### Method 2: Manual Testing - -```bash -cd pkg/util - -# Run all benchmarks -go test -bench=BenchmarkConcurrent -benchmem -benchtime=2s -test.run=xxx - -# Run specific tests -go test -bench=BenchmarkGCPressure -benchmem -benchtime=5s -test.run=xxx - -# Run streaming server scenario -go test -bench=BenchmarkStreamingServer -benchmem -benchtime=3s -test.run=xxx -``` - -#### Method 3: Run Key Tests Only - -```bash -cd pkg/util - -# GC pressure comparison (core advantage) -go test -bench=BenchmarkGCPressure -benchmem -test.run=xxx - -# Streaming server scenario (real application) -go test -bench=BenchmarkStreamingServer -benchmem -test.run=xxx -``` - -### 3.4 Actual Performance Test Results - -Actual results from running benchmarks on Apple M2 Pro: - -**Test Environment**: -- CPU: Apple M2 Pro (12 cores) -- OS: macOS (darwin/arm64) -- Go: 1.23.0 - -#### 3.4.1 Core Performance Comparison - -| Test Scenario | bufio.Reader | BufReader | Difference | -|--------------|-------------|-----------|-----------| -| **Concurrent Network Read** | 103.2 ns/op
1027 B/op, 1 allocs | 147.6 ns/op
4 B/op, 0 allocs | Zero alloc ⭐ | -| **GC Pressure Test** | 1874 ns/op
5,576,659 mallocs
3 gc-runs | 112.7 ns/op
3,918 mallocs
2 gc-runs | **16.6x faster** ⭐⭐⭐ | -| **Streaming Server** | 374.6 ns/op
79,508 MB-alloc
134 gc-runs | 30.29 ns/op
601 MB-alloc
2 gc-runs | **12.4x faster** ⭐⭐⭐ | - -#### 3.4.2 GC Pressure Comparison (Core Finding) - -**GC Pressure Test** results best demonstrate long-term running differences: - -**bufio.Reader**: -``` -Operation Latency: 1874 ns/op -Allocation Count: 5,576,659 times (over 5 million!) -GC Runs: 3 times -Per Operation: 2 allocs/op -``` - -**BufReader**: -``` -Operation Latency: 112.7 ns/op (16.6x faster) -Allocation Count: 3,918 times (99.93% reduction) -GC Runs: 2 times -Per Operation: 0 allocs/op (zero allocation!) -``` - -**Key Metrics**: -- 🚀 **16x Throughput Improvement**: 45.7M ops/s vs 2.8M ops/s -- ⭐ **99.93% Allocation Reduction**: From 5.57 million to 3,918 times -- ✨ **Zero Allocation Operations**: 0 allocs/op vs 2 allocs/op - -#### 3.4.3 Streaming Server Scenario (Real Application) - -Simulating 100 concurrent streams, continuously reading and forwarding data: - -**bufio.Reader**: -``` -Operation Latency: 374.6 ns/op -Memory Allocation: 79,508 MB (79 GB!) -GC Runs: 134 times -Per Operation: 4 allocs/op -``` - -**BufReader**: -``` -Operation Latency: 30.29 ns/op (12.4x faster) -Memory Allocation: 601 MB (99.2% reduction) -GC Runs: 2 times (98.5% reduction!) -Per Operation: 0 allocs/op -``` - -**Stunning Differences**: -- 🎯 **GC Runs: 134 → 2** (98.5% reduction) -- 💾 **Memory Allocation: 79 GB → 0.6 GB** (132x reduction) -- ⚡ **Throughput: 10.1M → 117M ops/s** (11.6x improvement) - -#### 3.4.4 Long-Term Running Impact - -For streaming server scenarios, **1-hour running** estimation: - -**bufio.Reader**: -``` -Estimated Memory Allocation: ~2.8 TB -Estimated GC Runs: ~4,800 times -Cumulative GC Pause: Significant -``` - -**BufReader**: -``` -Estimated Memory Allocation: ~21 GB (133x reduction) -Estimated GC Runs: ~72 times (67x reduction) -Cumulative GC Pause: Minimal -``` - -**Usage Recommendations**: - -| Scenario | Recommended | Reason | -|----------|------------|---------| -| Simple file reading | bufio.Reader | Standard library sufficient | -| **High-concurrency network server** | **BufReader** ⭐ | **98% GC reduction** | -| **Streaming media processing** | **BufReader** ⭐ | **Zero allocation, high throughput** | -| **Long-running services** | **BufReader** ⭐ | **More stable system** | - -#### 3.4.5 Essential Reasons for Performance Improvement - -While bufio.Reader is faster in some simple scenarios, BufReader's design goals are not to be faster in all cases, but rather: - -1. **Eliminate Memory Allocation** - Avoid frequent `make([]byte, n)` in real applications -2. **Reduce GC Pressure** - Reuse memory through object pool, reducing garbage collection burden -3. **Zero-Copy Processing** - Provide `ReadRange` API for direct data manipulation -4. **Chained Buffering** - Support complex data processing patterns - -In scenarios like **Monibuca streaming server**, the value of these features far exceeds microsecond-level latency differences. - -**Real Impact**: When handling 1000 concurrent streaming connections: - -```go -// bufio.Reader approach -// 1000 connections × 30fps × 1024 bytes/packet = 30,720,000 allocations per second -// 1024 bytes per allocation = ~30GB/sec temporary memory allocation -// Triggers massive GC - -// BufReader approach -// 0 allocations (memory reuse) -// 90%+ GC pressure reduction -// Significantly improved system stability -``` - -**Selection Guidelines**: - -- 📁 **Simple file reading** → bufio.Reader -- 🔄 **High-concurrency network services** → BufReader (98% GC reduction) -- 💾 **Long-running services** → BufReader (zero allocation) -- 🎯 **Streaming server** → BufReader (10-20x throughput) - -## 4. Real-World Use Cases - -### 4.1 RTSP Protocol Parsing - -```go -// Use BufReader to parse RTSP requests -func parseRTSPRequest(conn net.Conn) (*RTSPRequest, error) { - reader := util.NewBufReader(conn) - defer reader.Recycle() - - // Read request line: zero-copy, no memory allocation - requestLine, err := reader.ReadLine() - if err != nil { - return nil, err - } - - // Read headers: directly operate on memory blocks - headers, err := reader.ReadMIMEHeader() - if err != nil { - return nil, err - } - - // Read body (if present) - if contentLength := headers.Get("Content-Length"); contentLength != "" { - length, _ := strconv.Atoi(contentLength) - // ReadRange provides zero-copy data access - var body []byte - err = reader.ReadRange(length, func(chunk []byte) { - body = append(body, chunk...) - }) - } - - return &RTSPRequest{ - RequestLine: requestLine, - Headers: headers, - }, nil -} -``` - -### 4.2 Streaming Media Packet Parsing - -```go -// Use BufReader to parse FLV packets -func parseFLVPackets(conn net.Conn) error { - reader := util.NewBufReader(conn) - defer reader.Recycle() - - for { - // Read packet header: 4 bytes - packetType, err := reader.ReadByte() - if err != nil { - return err - } - - // Read data size: 3 bytes big-endian - dataSize, err := reader.ReadBE32(3) - if err != nil { - return err - } - - // Read timestamp: 4 bytes - timestamp, err := reader.ReadBE32(4) - if err != nil { - return err - } - - // Skip StreamID: 3 bytes - if err := reader.Skip(3); err != nil { - return err - } - - // Read actual data: zero-copy processing - err = reader.ReadRange(int(dataSize), func(data []byte) { - // Process data directly, no copy needed - processPacket(packetType, timestamp, data) - }) - if err != nil { - return err - } - - // Skip previous tag size - if err := reader.Skip(4); err != nil { - return err - } - } -} -``` - -### 4.3 Performance-Critical Scenarios - -BufReader is particularly suitable for: - -1. **High-frequency small packet processing**: Network protocol parsing, RTP/RTCP packet handling -2. **Large data stream transmission**: Continuous reading of video/audio streams -3. **Multi-step protocol reading**: Protocols requiring step-by-step reading of different length data -4. **Low-latency requirements**: Real-time streaming media transmission, online gaming -5. **High-concurrency scenarios**: Servers with massive concurrent connections - -## 5. Best Practices - -### 5.1 Correct Usage Patterns - -```go -// ✅ Correct: Specify appropriate block size on creation -func goodExample(conn net.Conn) { - // Choose block size based on actual packet size - reader := util.NewBufReaderWithBufLen(conn, 16384) // 16KB blocks - defer reader.Recycle() // Ensure resource recycling - - // Use ReadRange for zero-copy - reader.ReadRange(1024, func(data []byte) { - // Process directly, don't hold reference to data - process(data) - }) -} - -// ❌ Wrong: Forget to recycle resources -func badExample1(conn net.Conn) { - reader := util.NewBufReader(conn) - // Missing defer reader.Recycle() - // Memory blocks cannot be returned to object pool -} - -// ❌ Wrong: Holding data reference -var globalData []byte - -func badExample2(conn net.Conn) { - reader := util.NewBufReader(conn) - defer reader.Recycle() - - reader.ReadRange(1024, func(data []byte) { - // ❌ Wrong: data will be recycled after Recycle - globalData = data // Dangling reference - }) -} - -// ✅ Correct: Copy when data needs to be retained -func goodExample2(conn net.Conn) { - reader := util.NewBufReader(conn) - defer reader.Recycle() - - var saved []byte - reader.ReadRange(1024, func(data []byte) { - // Explicitly copy when retention needed - saved = make([]byte, len(data)) - copy(saved, data) - }) - // Now safe to use saved -} -``` - -### 5.2 Block Size Selection - -```go -// Choose appropriate block size based on scenario -const ( - // Small packet protocols (e.g., RTSP, HTTP headers) - SmallPacketSize = 4 << 10 // 4KB - - // Medium data streams (e.g., audio) - MediumPacketSize = 16 << 10 // 16KB - - // Large data streams (e.g., video) - LargePacketSize = 64 << 10 // 64KB -) - -func createReaderForProtocol(conn net.Conn, protocol string) *util.BufReader { - var bufSize int - switch protocol { - case "rtsp", "http": - bufSize = SmallPacketSize - case "audio": - bufSize = MediumPacketSize - case "video": - bufSize = LargePacketSize - default: - bufSize = util.defaultBufSize - } - return util.NewBufReaderWithBufLen(conn, bufSize) -} -``` - -### 5.3 Error Handling - -```go -func robustRead(conn net.Conn) error { - reader := util.NewBufReader(conn) - defer func() { - // Ensure resources are recycled in all cases - reader.Recycle() - }() - - // Set timeout - conn.SetReadDeadline(time.Now().Add(5 * time.Second)) - - // Read data - data, err := reader.ReadBytes(1024) - if err != nil { - if err == io.EOF { - // Normal end - return nil - } - // Handle other errors - return fmt.Errorf("read error: %w", err) - } - - // Process data - processData(data) - return nil -} -``` - -## 6. Performance Optimization Tips - -### 6.1 Batch Processing - -```go -// ✅ Optimized: Batch reading and processing -func optimizedBatchRead(reader *util.BufReader) error { - // Read large chunk of data at once - return reader.ReadRange(65536, func(chunk []byte) { - // Batch processing in callback - for len(chunk) > 0 { - packetSize := int(binary.BigEndian.Uint32(chunk[:4])) - packet := chunk[4 : 4+packetSize] - processPacket(packet) - chunk = chunk[4+packetSize:] - } - }) -} - -// ❌ Inefficient: Read one by one -func inefficientRead(reader *util.BufReader) error { - for { - size, err := reader.ReadBE32(4) - if err != nil { - return err - } - packet, err := reader.ReadBytes(int(size)) - if err != nil { - return err - } - processPacket(packet.Buffers[0]) - } -} -``` - -### 6.2 Avoid Unnecessary Copying - -```go -// ✅ Optimized: Direct processing, no copy -func zeroCopyProcess(reader *util.BufReader) error { - return reader.ReadRange(4096, func(data []byte) { - // Operate directly on original memory - sum := 0 - for _, b := range data { - sum += int(b) - } - reportChecksum(sum) - }) -} - -// ❌ Inefficient: Unnecessary copy -func unnecessaryCopy(reader *util.BufReader) error { - mem, err := reader.ReadBytes(4096) - if err != nil { - return err - } - // Another copy performed - data := make([]byte, mem.Size) - copy(data, mem.Buffers[0]) - - sum := 0 - for _, b := range data { - sum += int(b) - } - reportChecksum(sum) - return nil -} -``` - -### 6.3 Proper Resource Management - -```go -// ✅ Optimized: Use object pool to manage BufReader -type ConnectionPool struct { - readers sync.Pool -} - -func (p *ConnectionPool) GetReader(conn net.Conn) *util.BufReader { - if reader := p.readers.Get(); reader != nil { - r := reader.(*util.BufReader) - // Re-initialize - return r - } - return util.NewBufReader(conn) -} - -func (p *ConnectionPool) PutReader(reader *util.BufReader) { - reader.Recycle() // Recycle memory blocks - p.readers.Put(reader) // Recycle BufReader object itself -} - -// Use connection pool -func handleConnection(pool *ConnectionPool, conn net.Conn) { - reader := pool.GetReader(conn) - defer pool.PutReader(reader) - - // Handle connection - processConnection(reader) -} -``` - -## 7. Summary - -### 7.1 Performance Comparison Visualization - -Based on actual benchmark results (concurrent scenarios): +#### Performance Visualization ``` -📊 GC Runs Comparison (Core Advantage) ⭐⭐⭐ +📊 GC Runs Comparison (Core Advantage) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ bufio.Reader ████████████████████████████████████████████████████████████████ 134 runs BufReader █ 2 runs ← 98.5% reduction! -📊 Total Memory Allocation Comparison +📊 Total Memory Allocation ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ bufio.Reader ████████████████████████████████████████████████████████████████ 79 GB BufReader █ 0.6 GB ← 99.2% reduction! -📊 Operation Throughput Comparison +📊 Throughput Comparison ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ bufio.Reader █████ 10.1M ops/s -BufReader ████████████████████████████████████████████████████████ 117M ops/s ← 11.6x! +BufReader ████████████████████████████████████████████████████████ 117M ops/s ``` -**Key Metrics** (Streaming Server Scenario): -- 🎯 **GC Runs**: From 134 to 2 (98.5% reduction) -- 💾 **Memory Allocation**: From 79 GB to 0.6 GB (132x reduction) -- ⚡ **Throughput**: 11.6x improvement +### 3.3 Why Non-Contiguous Memory Is So Fast -### 7.2 Core Advantages +**Reason 1: Zero-Copy Passing** +```go +// bufio - Must copy +buf := make([]byte, 1024) +reader.Read(buf) // Copy to contiguous memory -BufReader achieves zero-copy, high-performance network data reading through: +// BufReader - Pass reference +reader.ReadRange(1024, func(chunk []byte) { + // chunk is original memory block, no copy +}) +``` -1. **Zero-Copy Architecture** - - Data read directly from network to final memory location - - Use slice views to avoid data copying - - Chained buffer supports large data processing +**Reason 2: Memory Block Reuse** +``` +bufio: Allocate → Use → GC → Reallocate → ... +BufReader: Allocate → Use → Return to pool → Reuse from pool → ... + ↑ Same memory block reused repeatedly, no GC +``` -2. **Memory Reuse Mechanism** - - GoMem object pool reuses memory blocks - - Active memory management reduces GC pressure - - Configurable block sizes adapt to different scenarios +**Reason 3: Multi-Subscriber Sharing** +``` +Traditional: 1 packet → Copy 10 times → 10 subscribers +BufReader: 1 packet → Pass reference → 10 subscribers share + ↑ Only 1 memory block, all 10 subscribers reference it +``` -3. **Significant Performance Improvement** (in concurrent scenarios) - - GC runs reduced by 98.5% (134 → 2) - - Memory allocation reduced by 99.2% (79 GB → 0.6 GB) - - Throughput improved by 10-20x - - Significantly improved system stability +## 4. Usage Guide -### 7.3 Ideal Use Cases +### 4.1 Basic Usage -BufReader is particularly suitable for: +```go +func handleConnection(conn net.Conn) { + // Create BufReader + reader := util.NewBufReader(conn) + defer reader.Recycle() // Return all blocks to pool + + // Zero-copy read and process + reader.ReadRange(4096, func(chunk []byte) { + // chunk is non-contiguous memory block + // Process directly, no copy needed + processChunk(chunk) + }) +} +``` -- ✅ High-performance network servers -- ✅ Streaming media data processing -- ✅ Real-time protocol parsing -- ✅ Large data stream transmission -- ✅ Low-latency requirements -- ✅ High-concurrency environments +### 4.2 Real-World Use Cases -Not suitable for: +**Scenario 1: Protocol Parsing** -- ❌ Simple file reading (standard library sufficient) -- ❌ Single small data reads -- ❌ Performance-insensitive scenarios +```go +// Parse FLV packet (header + data) +func parseFLV(reader *BufReader) { + // Read packet type (1 byte) + packetType, _ := reader.ReadByte() + + // Read data size (3 bytes) + dataSize, _ := reader.ReadBE32(3) + + // Skip timestamp etc (7 bytes) + reader.Skip(7) + + // Zero-copy read data (may span multiple non-contiguous blocks) + reader.ReadRange(int(dataSize), func(chunk []byte) { + // chunk may be complete data or partial + // Parse block by block, no need to wait for complete data + parseDataChunk(packetType, chunk) + }) +} +``` -### 7.4 Choosing Between bufio.Reader and BufReader +**Scenario 2: High-Concurrency Forwarding** -| Scenario | Recommended | -|----------|------------| -| Simple file reading | bufio.Reader | -| Low-frequency network reads | bufio.Reader | -| High-performance network server | BufReader | -| Streaming media processing | BufReader | -| Protocol parsers | BufReader | -| Zero-copy requirements | BufReader | -| Memory-sensitive scenarios | BufReader | +```go +// Read from one source, forward to multiple targets +func relay(source *BufReader, targets []io.Writer) { + reader.ReadRange(8192, func(chunk []byte) { + // All targets share the same memory block + for _, target := range targets { + target.Write(chunk) // Zero-copy forwarding + } + }) +} +``` -### 7.5 Key Points +**Scenario 3: Streaming Server** + +```go +// Receive RTSP stream and distribute to subscribers +type Stream struct { + reader *BufReader + subscribers []*Subscriber +} + +func (s *Stream) Process() { + s.reader.ReadRange(65536, func(frame []byte) { + // frame may be part of video frame (non-contiguous) + // Send directly to all subscribers + for _, sub := range s.subscribers { + sub.WriteFrame(frame) // Shared memory, zero-copy + } + }) +} +``` + +### 4.3 Best Practices + +**✅ Correct Usage**: + +```go +// 1. Always recycle resources +reader := util.NewBufReader(conn) +defer reader.Recycle() + +// 2. Process directly in callback, don't save references +reader.ReadRange(1024, func(data []byte) { + processData(data) // ✅ Process immediately +}) + +// 3. Explicitly copy when retention needed +var saved []byte +reader.ReadRange(1024, func(data []byte) { + saved = append(saved, data...) // ✅ Explicit copy +}) +``` + +**❌ Wrong Usage**: + +```go +// ❌ Don't save references +var dangling []byte +reader.ReadRange(1024, func(data []byte) { + dangling = data // Wrong: data will be recycled +}) +// dangling is now a dangling reference! + +// ❌ Don't forget to recycle +reader := util.NewBufReader(conn) +// Missing defer reader.Recycle() +// Memory blocks cannot be returned to pool +``` + +### 4.4 Performance Optimization Tips + +**Tip 1: Batch Processing** + +```go +// ✅ Optimized: Read multiple packets at once +reader.ReadRange(65536, func(chunk []byte) { + // One chunk may contain multiple packets + for len(chunk) >= 4 { + size := int(binary.BigEndian.Uint32(chunk[:4])) + packet := chunk[4 : 4+size] + processPacket(packet) + chunk = chunk[4+size:] + } +}) +``` + +**Tip 2: Choose Appropriate Block Size** + +```go +// Choose based on application scenario +const ( + SmallPacket = 4 << 10 // 4KB - RTSP/HTTP + MediumPacket = 16 << 10 // 16KB - Audio streams + LargePacket = 64 << 10 // 64KB - Video streams +) + +reader := util.NewBufReaderWithBufLen(conn, LargePacket) +``` + +## 5. Summary + +### Core Innovation: Non-Contiguous Memory Buffering + +BufReader's core is not "better buffering" but **fundamentally changing the memory layout model**: + +``` +Traditional thinking: Data must be in contiguous memory +BufReader: Data can be scattered across blocks, passed by reference + +Result: +✓ Zero-copy: No need to reassemble into contiguous memory +✓ Zero allocation: Memory blocks reused from object pool +✓ Zero GC pressure: No temporary objects created +``` + +### Key Advantages + +| Feature | Implementation | Performance Impact | +|---------|---------------|-------------------| +| **Zero-Copy** | Pass memory block references | No copy overhead | +| **Zero Allocation** | Object pool reuse | 98.5% GC reduction | +| **Multi-Subscriber Sharing** | Same block referenced multiple times | 10x+ memory savings | +| **Flexible Block Sizes** | Adapt to network fluctuations | No reassembly needed | + +### Ideal Use Cases + +| Scenario | Recommended | Reason | +|----------|------------|---------| +| **High-concurrency network servers** | BufReader ⭐ | 98% GC reduction, 10x+ throughput | +| **Stream forwarding** | BufReader ⭐ | Zero-copy multicast, memory sharing | +| **Protocol parsers** | BufReader ⭐ | Parse block by block, no complete packet needed | +| **Long-running services** | BufReader ⭐ | Stable system, minimal GC impact | +| Simple file reading | bufio.Reader | Standard library sufficient | + +### Key Points Remember when using BufReader: -1. **Always call Recycle()**: Ensure memory blocks are returned to object pool -2. **Don't hold data references**: Data in ReadRange callback will be recycled -3. **Choose appropriate block size**: Adjust based on actual packet size -4. **Leverage ReadRange**: Achieve true zero-copy processing -5. **Use with GoMem**: Fully leverage memory reuse advantages +1. **Accept non-contiguous data**: Process each block via callback +2. **Don't hold references**: Data recycled after callback returns +3. **Leverage ReadRange**: This is the core zero-copy API +4. **Must call Recycle()**: Return memory blocks to pool -Through the combination of BufReader and GoMem, Monibuca achieves high-performance network data processing, providing solid infrastructure support for streaming media servers. +### Performance Data + +**Streaming Server (100 concurrent streams, continuous running)**: + +``` +1-hour running estimation: + +bufio.Reader (Contiguous Memory): +- Allocates 2.8 TB memory +- Triggers 4,800 GCs +- Frequent system pauses + +BufReader (Non-Contiguous Memory): +- Allocates 21 GB memory (133x less) +- Triggers 72 GCs (67x less) +- Almost no GC impact +``` + +### Testing and Documentation + +**Run Tests**: +```bash +sh scripts/benchmark_bufreader.sh +``` + +**Complete Documentation**: +- Chinese: `doc_CN/bufreader_analysis.md` +- English: `doc/bufreader_analysis.md` +- Non-Contiguous Memory Guide: `doc/bufreader_non_contiguous_buffer.md` ## References -- [GoMem Project](https://github.com/langhuihui/gomem) -- [Monibuca v5 Documentation](https://m7s.live) -- [Object Reuse Technology Deep Dive](./arch/reuse.md) -- Go standard library `bufio` package source code -- Go standard library `sync.Pool` documentation +- [GoMem Project](https://github.com/langhuihui/gomem) - Memory object pool implementation +- [Monibuca v5](https://m7s.live) - Streaming media server +- Test Code: `pkg/util/buf_reader_benchmark_test.go` +--- + +**Core Idea**: Eliminate traditional contiguous buffer copying overhead through non-contiguous memory block chains and zero-copy reference passing, achieving high-performance network data processing. diff --git a/doc_CN/bufreader_analysis.md b/doc_CN/bufreader_analysis.md index db52227..e83cc7a 100644 --- a/doc_CN/bufreader_analysis.md +++ b/doc_CN/bufreader_analysis.md @@ -1,1040 +1,697 @@ -# BufReader:零拷贝网络读取的内存管理方案 +# BufReader:基于非连续内存缓冲的零拷贝网络读取方案 ## 目录 -- [1. 标准库 bufio.Reader 的内存分配问题](#1-标准库-bufioreader-的内存分配问题) -- [2. BufReader:零拷贝的解决方案](#2-bufreader零拷贝的解决方案) -- [3. 性能基准测试](#3-性能基准测试) -- [4. 实际应用场景](#4-实际应用场景) -- [5. 最佳实践](#5-最佳实践) -- [6. 性能优化技巧](#6-性能优化技巧) -- [7. 总结](#7-总结) +- [1. 问题:传统连续内存缓冲的瓶颈](#1-问题传统连续内存缓冲的瓶颈) +- [2. 核心方案:非连续内存缓冲传递机制](#2-核心方案非连续内存缓冲传递机制) +- [3. 性能验证](#3-性能验证) +- [4. 使用指南](#4-使用指南) ## TL;DR (核心要点) -如果你时间有限,以下是最重要的结论: +**核心创新**:非连续内存缓冲传递机制 +- 数据以**内存块链表**形式存储,非连续布局 +- 通过 **ReadRange 回调**逐块传递引用,零拷贝 +- 内存块从**对象池复用**,避免分配和 GC -**BufReader 的核心优势**(并发场景): -- ⭐ **GC 次数减少 98.5%**:134 次 → 2 次(流媒体场景) -- 🚀 **内存分配减少 99.93%**:557 万次 → 3918 次 -- 🔄 **吞吐量提升 10-20 倍**:零分配 + 内存复用 - -**关键数据**: +**性能数据**(流媒体服务器,100 并发流): ``` -流媒体服务器场景(100 并发流): -bufio.Reader: 79 GB 分配,134 次 GC -BufReader: 0.6 GB 分配,2 次 GC +bufio.Reader: 79 GB 分配,134 次 GC,374.6 ns/op +BufReader: 0.6 GB 分配,2 次 GC,30.29 ns/op + +结果:GC 减少 98.5%,吞吐量提升 11.6 倍 ``` -**适用场景**: -- ✅ 高并发网络服务器 -- ✅ 流媒体数据处理 -- ✅ 长期运行服务(7x24) - -**快速测试**: -```bash -sh scripts/benchmark_bufreader.sh -``` +**适用场景**:高并发网络服务器、流媒体处理、长期运行服务 --- -## 引言 +## 1. 问题:传统连续内存缓冲的瓶颈 -在高性能网络编程中,频繁的内存分配和拷贝是性能瓶颈的主要来源。Go 标准库提供的 `bufio.Reader` 虽然提供了缓冲读取功能,但在处理网络数据流时仍然存在大量的内存分配和拷贝操作。本文将深入分析这一问题,并介绍 Monibuca 项目中实现的 `BufReader`,展示如何通过 GoMem 内存分配器实现零拷贝的高性能网络数据读取。 +### 1.1 bufio.Reader 的连续内存模型 -## 1. 标准库 bufio.Reader 的内存分配问题 - -### 1.1 bufio.Reader 的工作原理 - -`bufio.Reader` 采用固定大小的内部缓冲区来减少系统调用次数: +标准库 `bufio.Reader` 使用**固定大小的连续内存缓冲区**: ```go type Reader struct { - buf []byte // 固定大小的缓冲区 - rd io.Reader // 底层 reader - r, w int // 读写位置 + buf []byte // 单一连续缓冲区(如 4KB) + r, w int // 读写指针 } func (b *Reader) Read(p []byte) (n int, err error) { - // 1. 如果缓冲区为空,从底层 reader 读取数据填充缓冲区 - if b.r == b.w { - n, err = b.rd.Read(b.buf) // 数据拷贝到内部缓冲区 - b.w += n - } - - // 2. 从缓冲区拷贝数据到目标切片 - n = copy(p, b.buf[b.r:b.w]) // 再次拷贝数据 - b.r += n + // 从连续缓冲区拷贝到目标 + n = copy(p, b.buf[b.r:b.w]) // 必须拷贝 return } ``` -### 1.2 内存分配问题分析 +**连续内存的代价**: -使用 `bufio.Reader` 读取网络数据时存在以下问题: +``` +读取 16KB 数据(缓冲区 4KB): -**问题 1:多次内存拷贝** +网络 → bufio 缓冲区 → 用户缓冲区 + ↓ (4KB 连续) ↓ +第1次 [████] → 拷贝到 result[0:4KB] +第2次 [████] → 拷贝到 result[4KB:8KB] +第3次 [████] → 拷贝到 result[8KB:12KB] +第4次 [████] → 拷贝到 result[12KB:16KB] -```mermaid -sequenceDiagram - participant N as 网络 Socket - participant B as bufio.Reader 内部缓冲区 - participant U as 用户缓冲区 - participant A as 应用层处理 - - N->>B: 系统调用读取数据(第1次拷贝) - Note over B: 数据存储在固定缓冲区 - B->>U: copy() 拷贝到用户缓冲区(第2次拷贝) - Note over U: 用户获取数据副本 - U->>A: 传递给应用层(可能第3次拷贝) - Note over A: 应用层处理数据 +总计:4 次网络读取 + 4 次内存拷贝 +每次分配 result (16KB 连续内存) ``` -每次读取操作都需要至少两次内存拷贝: -1. 从网络 socket 拷贝到 `bufio.Reader` 的内部缓冲区 -2. 从内部缓冲区拷贝到用户提供的切片 +### 1.2 高并发场景的问题 -**问题 2:固定缓冲区限制** +在流媒体服务器(100 个并发连接,每个 30fps): ```go -// bufio.Reader 使用固定大小的缓冲区 -reader := bufio.NewReaderSize(conn, 4096) // 固定 4KB - -// 读取大块数据时需要多次操作 -data := make([]byte, 16384) // 需要读取 16KB -for total := 0; total < 16384; { - n, err := reader.Read(data[total:]) // 需要循环读取 4 次 - total += n -} -``` - -**问题 3:频繁的内存分配** - -```go -// 每次读取都需要分配新的切片 -func processPackets(reader *bufio.Reader) { +// 典型的处理模式 +func handleStream(conn net.Conn) { + reader := bufio.NewReaderSize(conn, 4096) for { - // 为每个数据包分配新内存 - header := make([]byte, 4) // 分配 1 - reader.Read(header) + // 为每个数据包分配连续缓冲区 + packet := make([]byte, 1024) // 分配 1 + n, _ := reader.Read(packet) // 拷贝 1 - size := binary.BigEndian.Uint32(header) - payload := make([]byte, size) // 分配 2 - reader.Read(payload) - - // 处理完后,内存被 GC 回收 - processPayload(payload) - // 下次循环重新分配... + // 转发给多个订阅者 + for _, sub := range subscribers { + data := make([]byte, n) // 分配 2-N + copy(data, packet[:n]) // 拷贝 2-N + sub.Write(data) + } } } + +// 性能影响: +// 100 连接 × 30fps × (1 + 订阅者数) 次分配 = 大量临时内存 +// 触发频繁 GC,系统不稳定 ``` -### 1.3 性能影响 +**核心问题**: +1. 必须维护连续内存布局 → 频繁拷贝 +2. 每个数据包分配新缓冲区 → 大量临时对象 +3. 转发需要多次拷贝 → CPU 浪费在内存操作上 -在高频率网络数据处理场景下,这些问题会导致: - -1. **CPU 开销增加**:频繁的 `copy()` 操作消耗 CPU 资源 -2. **GC 压力上升**:大量临时内存分配增加垃圾回收负担 -3. **延迟增加**:每次内存分配和拷贝都增加处理延迟 -4. **吞吐量下降**:内存操作成为瓶颈,限制整体吞吐量 - -## 2. BufReader:零拷贝的解决方案 +## 2. 核心方案:非连续内存缓冲传递机制 ### 2.1 设计理念 -`BufReader` 基于以下核心理念设计: +BufReader 采用**非连续内存块链表**: -1. **零拷贝读取**:直接从网络读取到最终的内存位置,避免中间拷贝 -2. **内存复用**:通过 GoMem 分配器复用内存块,避免频繁分配 -3. **链式缓冲**:使用多个内存块组成链表,而非单一固定缓冲区 -4. **按需分配**:根据实际读取量动态调整内存使用 +``` +不再要求数据在连续内存中,而是: +1. 数据分散在多个内存块中(链表) +2. 每个块独立管理和复用 +3. 通过引用传递,不拷贝数据 +``` -### 2.2 核心数据结构 +**核心数据结构**: ```go type BufReader struct { - Allocator *ScalableMemoryAllocator // 可扩展的内存分配器 - buf MemoryReader // 内存块链表读取器 - totalRead int // 总读取字节数 - BufLen int // 每次读取的块大小 - Mouth chan []byte // 数据输入通道 - feedData func() error // 数据填充函数 + Allocator *ScalableMemoryAllocator // 对象池分配器 + buf MemoryReader // 内存块链表 } -// MemoryReader 管理多个内存块 type MemoryReader struct { - *Memory // 内存管理器 - Buffers [][]byte // 内存块链表 - Size int // 总大小 - Length int // 可读长度 + Buffers [][]byte // 多个内存块,非连续! + Size int // 总大小 + Length int // 可读长度 } ``` -### 2.3 工作流程 +### 2.2 非连续内存缓冲模型 -#### 2.3.1 零拷贝数据读取流程 +#### 连续 vs 非连续对比 + +``` +bufio.Reader(连续内存): +┌─────────────────────────────────┐ +│ 4KB 固定缓冲区 │ +│ [已读][可用] │ +└─────────────────────────────────┘ +- 必须拷贝到连续的目标缓冲区 +- 固定大小限制 +- 已读部分浪费空间 + +BufReader(非连续内存): +┌──────┐ ┌──────┐ ┌────────┐ ┌──────┐ +│Block1│→│Block2│→│ Block3 │→│Block4│ +│ 512B │ │ 1KB │ │ 2KB │ │ 3KB │ +└──────┘ └──────┘ └────────┘ └──────┘ +- 直接传递每个块的引用(零拷贝) +- 灵活的块大小 +- 处理完立即回收 +``` + +#### 内存块链表的工作流程 ```mermaid sequenceDiagram - participant N as 网络 Socket - participant A as ScalableMemoryAllocator + participant N as 网络 + participant P as 对象池 participant B as BufReader.buf participant U as 用户代码 - U->>B: Read(n) - B->>B: 检查缓冲区是否有数据 - alt 缓冲区无数据 - B->>A: 申请内存块 - Note over A: 从对象池获取或分配新块 - A-->>B: 返回内存块引用 - B->>N: 直接读取到内存块 - Note over N,B: 零拷贝:数据直接写入最终位置 - end - B-->>U: 返回内存块的切片视图 - Note over U: 用户直接使用,无需拷贝 - U->>U: 处理数据 - U->>A: 回收内存块(可选) - Note over A: 内存块回到对象池等待复用 + N->>P: 第1次读取(返回 512B) + P-->>B: Block1 (512B) - 从池获取或新建 + B->>B: Buffers = [Block1] + + N->>P: 第2次读取(返回 1KB) + P-->>B: Block2 (1KB) - 从池复用 + B->>B: Buffers = [Block1, Block2] + + N->>P: 第3次读取(返回 2KB) + P-->>B: Block3 (2KB) + B->>B: Buffers = [Block1, Block2, Block3] + + U->>B: ReadRange(4096) + B->>U: yield(Block1) - 传递引用 + B->>U: yield(Block2) - 传递引用 + B->>U: yield(Block3) - 传递引用 + B->>U: yield(Block4[0:512]) + + U->>B: 数据处理完成 + B->>P: 回收 Block1, Block2, Block3, Block4 + Note over P: 内存块回到池中等待复用 ``` -#### 2.3.2 内存块管理流程 +### 2.3 零拷贝传递:ReadRange API -```mermaid -graph TD - A[开始读取] --> B{buf 有数据?} - B -->|是| C[直接返回数据视图] - B -->|否| D[调用 feedData] - D --> E[Allocator.Read 申请内存] - E --> F{对象池有空闲块?} - F -->|是| G[复用现有内存块] - F -->|否| H[分配新内存块] - G --> I[从网络读取数据] - H --> I - I --> J[追加到 buf.Buffers] - J --> K[更新 Size 和 Length] - K --> C - C --> L[用户读取数据] - L --> M{数据已处理完?} - M -->|是| N[ClipFront 回收前面的块] - N --> O[Allocator.Free 归还对象池] - O --> P[结束] - M -->|否| A -``` - -### 2.4 核心实现分析 - -#### 2.4.1 初始化和内存分配 +**核心 API**: ```go -func NewBufReader(reader io.Reader) *BufReader { - return NewBufReaderWithBufLen(reader, defaultBufSize) -} +func (r *BufReader) ReadRange(n int, yield func([]byte)) error +``` -func NewBufReaderWithBufLen(reader io.Reader, bufLen int) *BufReader { - r := &BufReader{ - Allocator: NewScalableMemoryAllocator(bufLen), // 创建分配器 - BufLen: bufLen, +**工作原理**: + +```go +// 内部实现(简化版) +func (r *BufReader) ReadRange(n int, yield func([]byte)) error { + remaining := n + + // 遍历内存块链表 + for _, block := range r.buf.Buffers { + if remaining <= 0 { + break + } + + if len(block) <= remaining { + // 整块传递 + yield(block) // 零拷贝:直接传递引用! + remaining -= len(block) + } else { + // 传递部分 + yield(block[:remaining]) + remaining = 0 + } + } + + // 回收已处理的块 + r.recycleFront() + return nil +} +``` + +**使用示例**: + +```go +// 读取 4096 字节数据 +reader.ReadRange(4096, func(chunk []byte) { + // chunk 是原始内存块的引用 + // 可能被调用多次,每次接收不同大小的块 + // 例如:512B, 1KB, 2KB, 512B + + processData(chunk) // 直接处理,零拷贝! +}) + +// 特点: +// - 无需分配目标缓冲区 +// - 无需拷贝数据 +// - 每个 chunk 处理完后自动回收 +``` + +### 2.4 真实网络场景的优势 + +**场景:从网络读取 10KB 数据,网络每次返回 500B-2KB** + +``` +bufio.Reader(连续内存方案): +1. 读取 2KB 到内部缓冲区(连续) +2. 拷贝 2KB 到用户缓冲区 ← 拷贝 +3. 读取 1.5KB 到内部缓冲区 +4. 拷贝 1.5KB 到用户缓冲区 ← 拷贝 +5. 读取 2KB... +6. 拷贝 2KB... ← 拷贝 +... 重复 ... +总计:多次网络读取 + 多次内存拷贝 +必须分配 10KB 连续缓冲区 + +BufReader(非连续内存方案): +1. 读取 2KB → Block1,追加到链表 +2. 读取 1.5KB → Block2,追加到链表 +3. 读取 2KB → Block3,追加到链表 +4. 读取 2KB → Block4,追加到链表 +5. 读取 2.5KB → Block5,追加到链表 +6. ReadRange(10KB): + → yield(Block1) - 2KB + → yield(Block2) - 1.5KB + → yield(Block3) - 2KB + → yield(Block4) - 2KB + → yield(Block5) - 2.5KB +总计:多次网络读取 + 0 次内存拷贝 +无需分配连续内存,逐块处理 +``` + +### 2.5 实际应用:流媒体转发 + +**问题场景**:100 个并发流,每个流转发给 10 个订阅者 + +**传统方式**(连续内存): + +```go +func forwardStream_Traditional(reader *bufio.Reader, subscribers []net.Conn) { + packet := make([]byte, 4096) // 分配 1:连续内存 + n, _ := reader.Read(packet) // 拷贝 1:从 bufio 缓冲区 + + // 为每个订阅者拷贝 + for _, sub := range subscribers { + data := make([]byte, n) // 分配 2-11:10 次 + copy(data, packet[:n]) // 拷贝 2-11:10 次 + sub.Write(data) + } +} +// 每个数据包:11 次分配 + 11 次拷贝 +// 100 并发 × 30fps × 11 = 33,000 次分配/秒 +``` + +**BufReader 方式**(非连续内存): + +```go +func forwardStream_BufReader(reader *BufReader, subscribers []net.Conn) { + reader.ReadRange(4096, func(chunk []byte) { + // chunk 是原始内存块引用,可能非连续 + // 所有订阅者共享同一块内存! + + for _, sub := range subscribers { + sub.Write(chunk) // 直接发送引用,零拷贝 + } + }) +} +// 每个数据包:0 次分配 + 0 次拷贝 +// 100 并发 × 30fps × 0 = 0 次分配/秒 +``` + +**性能对比**: +- 分配次数:33,000/秒 → 0/秒 +- 内存拷贝:33,000/秒 → 0/秒 +- GC 压力:高 → 极低 + +### 2.6 内存块的生命周期 + +```mermaid +stateDiagram-v2 + [*] --> 从对象池获取 + 从对象池获取 --> 读取网络数据 + 读取网络数据 --> 追加到链表 + 追加到链表 --> 传递给用户 + 传递给用户 --> 用户处理 + 用户处理 --> 回收到对象池 + 回收到对象池 --> 从对象池获取 + + note right of 从对象池获取 + 复用已有内存块 + 避免 GC + end note + + note right of 传递给用户 + 传递引用,零拷贝 + 可能传递给多个订阅者 + end note + + note right of 回收到对象池 + 主动回收 + 立即可复用 + end note +``` + +**关键点**: +1. 内存块在对象池中**循环复用**,不经过 GC +2. 传递引用而非拷贝数据,实现零拷贝 +3. 处理完立即回收,内存占用最小化 + +### 2.7 核心代码实现 + +```go +// 创建 BufReader +func NewBufReader(reader io.Reader) *BufReader { + return &BufReader{ + Allocator: NewScalableMemoryAllocator(16384), // 对象池 feedData: func() error { - // 关键:从分配器读取,直接填充到内存块 + // 从对象池获取内存块,直接读取网络数据 buf, err := r.Allocator.Read(reader, r.BufLen) if err != nil { return err } - n := len(buf) - r.totalRead += n - // 直接追加内存块引用,无需拷贝 + // 追加到链表(只是添加引用) r.buf.Buffers = append(r.buf.Buffers, buf) - r.buf.Size += n - r.buf.Length += n + r.buf.Length += len(buf) return nil }, } - r.buf.Memory = &Memory{} - return r -} -``` - -**零拷贝关键点**: -- `Allocator.Read()` 直接从 `io.Reader` 读取到分配的内存块 -- 返回的 `buf` 是实际存储数据的内存块引用 -- `append(r.buf.Buffers, buf)` 只是追加引用,没有数据拷贝 - -#### 2.4.2 读取操作 - -```go -func (r *BufReader) ReadByte() (b byte, err error) { - // 如果缓冲区为空,触发数据填充 - for r.buf.Length == 0 { - if err = r.feedData(); err != nil { - return - } - } - // 从内存块链表中读取,无需拷贝 - return r.buf.ReadByte() } +// 零拷贝读取 func (r *BufReader) ReadRange(n int, yield func([]byte)) error { - for r.recycleFront(); n > 0 && err == nil; err = r.feedData() { - if r.buf.Length > 0 { - if r.buf.Length >= n { - // 直接传递内存块的切片视图,无需拷贝 - r.buf.RangeN(n, yield) - return - } - n -= r.buf.Length - r.buf.Range(yield) - } + for r.buf.Length < n { + r.feedData() // 从网络读取更多数据 } - return -} -``` - -**零拷贝体现**: -- `yield` 回调函数接收的是内存块的切片视图 -- 用户代码直接操作原始内存块,没有中间拷贝 -- 读取完成后,已读取的块自动回收 - -#### 2.4.3 内存回收 - -```go -func (r *BufReader) recycleFront() { - // 清理已读取的内存块 - r.buf.ClipFront(r.Allocator.Free) + + // 逐块传递引用 + for _, block := range r.buf.Buffers { + yield(block) // 零拷贝传递 + } + + // 回收已读取的块 + r.recycleFront() + return nil } +// 回收内存块到对象池 func (r *BufReader) Recycle() { - r.buf = MemoryReader{} if r.Allocator != nil { - // 将所有内存块归还给分配器 - r.Allocator.Recycle() - } - if r.Mouth != nil { - close(r.Mouth) + r.Allocator.Recycle() // 所有块归还对象池 } } ``` -### 2.5 与 bufio.Reader 的对比 +## 3. 性能验证 -```mermaid -graph LR - subgraph "bufio.Reader(多次拷贝)" - A1[网络] -->|系统调用| B1[内核缓冲区] - B1 -->|拷贝1| C1[bufio 缓冲区] - C1 -->|拷贝2| D1[用户切片] - D1 -->|拷贝3?| E1[应用层] - end - - subgraph "BufReader(零拷贝)" - A2[网络] -->|系统调用| B2[内核缓冲区] - B2 -->|直接读取| C2[GoMem 内存块] - C2 -->|切片视图| D2[用户代码] - D2 -->|回收| C2 - C2 -->|复用| C2 - end -``` +### 3.1 测试设计 -| 特性 | bufio.Reader | BufReader | -|------|-------------|-----------| -| 内存拷贝次数 | 2-3 次 | 0 次(切片视图) | -| 缓冲区模式 | 固定大小单缓冲区 | 可变大小链式缓冲区 | -| 内存分配 | 每次读取可能分配 | 对象池复用 | -| 内存回收 | GC 自动回收 | 主动归还对象池 | -| 大块数据处理 | 需要多次操作 | 单次追加到链表 | -| GC 压力 | 高 | 极低 | +**真实网络模拟**:每次读取返回随机大小(64-2048 字节),模拟真实网络波动 -## 3. 性能基准测试 +**核心测试场景**: +1. **并发网络连接读取** - 模拟 100+ 并发连接 +2. **GC 压力测试** - 展示长期运行差异 +3. **流媒体服务器** - 真实业务场景(100 流 × 转发) -### 3.1 测试场景设计 +### 3.2 性能测试结果 -#### 3.1.1 真实网络模拟 +**测试环境**:Apple M2 Pro, Go 1.23.0 -为了让基准测试更加贴近实际应用场景,我们实现了一个模拟真实网络行为的 `mockNetworkReader`。 +#### GC 压力测试(核心对比) -**真实网络的特性**: +| 指标 | bufio.Reader | BufReader | 改善 | +|------|-------------|-----------|------| +| 操作延迟 | 1874 ns/op | 112.7 ns/op | **16.6x 快** | +| 内存分配次数 | 5,576,659 | 3,918 | **减少 99.93%** | +| 每次操作 | 2 allocs/op | 0 allocs/op | **零分配** | +| 吞吐量 | 2.8M ops/s | 45.7M ops/s | **16x 提升** | -在真实的网络读取场景中,每次 `Read()` 调用返回的数据长度是**不确定**的,受多种因素影响: +#### 流媒体服务器场景 -- TCP 接收窗口大小 -- 网络延迟和带宽 -- 操作系统缓冲区状态 -- 网络拥塞情况 -- 网络质量波动 +| 指标 | bufio.Reader | BufReader | 改善 | +|------|-------------|-----------|------| +| 操作延迟 | 374.6 ns/op | 30.29 ns/op | **12.4x 快** | +| 内存分配 | 79,508 MB | 601 MB | **减少 99.2%** | +| **GC 次数** | **134** | **2** | **减少 98.5%** ⭐ | +| 吞吐量 | 10.1M ops/s | 117M ops/s | **11.6x 提升** | -**模拟实现**: - -```go -type mockNetworkReader struct { - data []byte - offset int - rng *rand.Rand - minChunk int // 最小块大小 - maxChunk int // 最大块大小 -} - -func (m *mockNetworkReader) Read(p []byte) (n int, err error) { - // 每次返回 minChunk 到 maxChunk 之间的随机长度数据 - chunkSize := m.minChunk + m.rng.Intn(m.maxChunk-m.minChunk+1) - n = copy(p[:chunkSize], m.data[m.offset:]) - m.offset += n - return n, nil -} -``` - -**不同网络状况模拟**: - -| 网络状况 | 数据块范围 | 实际场景 | -|---------|-----------|---------| -| 良好网络 | 1024-4096 字节 | 稳定的局域网、优质网络环境 | -| 一般网络 | 256-2048 字节 | 普通互联网连接 | -| 差网络 | 64-512 字节 | 高延迟、小 TCP 窗口 | -| 极差网络 | 1-128 字节 | 移动网络、严重拥塞 | - -这种模拟让基准测试结果更加真实可靠。 - -#### 3.1.2 测试场景列表 - -我们聚焦以下核心场景: - -1. **并发网络连接读取** - 展示零分配特性 -2. **并发协议解析** - 模拟真实应用 -3. **GC 压力测试** - 展示长期运行优势 ⭐ -4. **流媒体服务器场景** - 真实业务场景 ⭐ - -### 3.2 基准测试设计 - -#### 核心测试场景 - -基准测试聚焦于**并发网络场景**和**GC 压力**对比: - -**1. 并发网络连接读取** -- 模拟 100+ 并发连接持续读取数据 -- 每次读取 1KB 数据包并处理 -- bufio: 每次分配新缓冲区(`make([]byte, 1024)`) -- BufReader: 零拷贝处理(`ReadRange`) - -**2. 并发协议解析** -- 模拟流媒体服务器解析协议包 -- 读取包头(4字节)+ 数据内容 -- 对比内存分配策略差异 - -**3. GC 压力测试**(⭐ 核心) -- 持续并发读取和处理 -- 统计 GC 次数、内存分配总量、分配次数 -- 展示长期运行下的差异 - -**4. 流媒体服务器场景**(⭐ 真实应用) -- 模拟 100 个并发流 -- 每个流读取并转发数据给订阅者 -- 真实应用场景完整对比 - -#### 关键测试逻辑 - -**并发读取**: -```go -// bufio.Reader - 每次分配 -buf := make([]byte, 1024) // 1KB 分配 -n, _ := reader.Read(buf) -processData(buf[:n]) - -// BufReader - 零拷贝 -reader.ReadRange(1024, func(data []byte) { - processData(data) // 直接使用,无分配 -}) -``` - -**GC 统计**: -```go -// 记录 GC 统计 -var beforeGC, afterGC runtime.MemStats -runtime.ReadMemStats(&beforeGC) - -b.RunParallel(func(pb *testing.PB) { - // 并发测试... -}) - -runtime.ReadMemStats(&afterGC) -b.ReportMetric(float64(afterGC.NumGC-beforeGC.NumGC), "gc-runs") -b.ReportMetric(float64(afterGC.TotalAlloc-beforeGC.TotalAlloc)/1024/1024, "MB-alloc") -``` - -完整测试代码见:`pkg/util/buf_reader_benchmark_test.go` - -### 3.3 运行基准测试 - -我们提供了完整的基准测试代码(`pkg/util/buf_reader_benchmark_test.go`)和便捷的测试脚本。 - -#### 方法一:使用测试脚本(推荐) - -```bash -# 运行完整的基准测试套件 -sh scripts/benchmark_bufreader.sh -``` - -这个脚本会依次运行所有测试并输出友好的结果。 - -#### 方法二:手动运行测试 - -```bash -cd pkg/util - -# 运行所有基准测试 -go test -bench=BenchmarkBuf -benchmem -benchtime=2s -test.run=xxx - -# 运行特定测试 -go test -bench=BenchmarkMemoryAllocation -benchmem -benchtime=2s -test.run=xxx - -# 对比测试结果(需要安装 benchstat) -go test -bench=BenchmarkBufioReader_SmallChunks -benchmem -count=5 > bufio.txt -go test -bench=BenchmarkBufReader_SmallChunks -benchmem -count=5 > bufreader.txt -benchstat bufio.txt bufreader.txt -``` - -#### 方法三:只运行关键测试 - -```bash -cd pkg/util - -# 内存分配场景对比(核心优势) -go test -bench=BenchmarkMemoryAllocation -benchmem -test.run=xxx - -# 协议解析场景对比(实际应用) -go test -bench=BenchmarkProtocolParsing -benchmem -test.run=xxx -``` - -### 3.4 实际性能测试结果 - -在 Apple M2 Pro 上运行基准测试的实际结果: - -**测试环境**: -- CPU: Apple M2 Pro (12 核) -- OS: macOS (darwin/arm64) -- Go: 1.23.0 - -#### 3.4.1 核心性能对比 - -| 测试场景 | bufio.Reader | BufReader | 差异 | -|---------|-------------|-----------|------| -| **并发网络读取** | 103.2 ns/op
1027 B/op, 1 allocs | 147.6 ns/op
4 B/op, 0 allocs | 零分配 ⭐ | -| **GC 压力测试** | 1874 ns/op
5,576,659 mallocs
3 gc-runs | 112.7 ns/op
3,918 mallocs
2 gc-runs | **16.6x 快** ⭐⭐⭐ | -| **流媒体服务器** | 374.6 ns/op
79,508 MB-alloc
134 gc-runs | 30.29 ns/op
601 MB-alloc
2 gc-runs | **12.4x 快** ⭐⭐⭐ | - -#### 3.4.2 GC 压力对比(核心发现) - -**GC 压力测试**结果最能体现长期运行的差异: - -**bufio.Reader**: -``` -操作延迟: 1874 ns/op -内存分配次数: 5,576,659 次(超过 500 万次!) -GC 次数: 3 次 -每次操作: 2 allocs/op -``` - -**BufReader**: -``` -操作延迟: 112.7 ns/op (快 16.6 倍) -内存分配次数: 3,918 次(减少 99.93%) -GC 次数: 2 次 -每次操作: 0 allocs/op(零分配!) -``` - -**关键指标**: -- 🚀 **吞吐量提升 16 倍**:45.7M ops/s vs 2.8M ops/s -- ⭐ **内存分配减少 99.93%**:从 557 万次降至 3918 次 -- ✨ **零分配操作**:0 allocs/op vs 2 allocs/op - -#### 3.4.3 流媒体服务器场景(真实应用) - -模拟 100 个并发流,持续读取和转发数据: - -**bufio.Reader**: -``` -操作延迟: 374.6 ns/op -内存分配: 79,508 MB(79 GB!) -GC 次数: 134 次 -每次操作: 4 allocs/op -``` - -**BufReader**: -``` -操作延迟: 30.29 ns/op(快 12.4 倍) -内存分配: 601 MB(减少 99.2%) -GC 次数: 2 次(减少 98.5%!) -每次操作: 0 allocs/op -``` - -**惊人的差异**: -- 🎯 **GC 次数:134 次 → 2 次**(减少 98.5%) -- 💾 **内存分配:79 GB → 0.6 GB**(减少 132 倍) -- ⚡ **吞吐量:10.1M → 117M ops/s**(提升 11.6 倍) - -#### 3.4.4 长期运行的影响 - -在流媒体服务器场景下,**1 小时运行**的预估对比: - -**bufio.Reader**: -``` -预计内存分配:~2.8 TB -预计 GC 次数:~4,800 次 -GC 停顿累计:显著 -``` - -**BufReader**: -``` -预计内存分配:~21 GB(减少 133 倍) -预计 GC 次数:~72 次(减少 67 倍) -GC 停顿累计:极小 -``` - -**使用建议**: - -| 场景 | 推荐使用 | 原因 | -|------|---------|------| -| 简单文件读取 | bufio.Reader | 标准库足够 | -| **高并发网络服务器** | **BufReader** ⭐ | **GC 次数减少 98%** | -| **流媒体数据处理** | **BufReader** ⭐ | **零分配,高吞吐** | -| **长期运行服务** | **BufReader** ⭐ | **系统更稳定** | - -#### 3.4.5 性能提升的本质原因 - -虽然在某些简单场景下 bufio.Reader 更快,但 BufReader 的设计目标不是在所有场景下都比 bufio.Reader 快,而是: - -1. **消除内存分配** - 在实际应用中避免频繁的 `make([]byte, n)` -2. **降低 GC 压力** - 通过对象池复用内存,减少垃圾回收负担 -3. **零拷贝处理** - 提供 `ReadRange` API 直接操作原始数据 -4. **链式缓冲** - 支持复杂的数据处理模式 - -在 **Monibuca 流媒体服务器** 这样的场景下,这些特性带来的价值远超过微秒级的操作延迟差异。 - -**实际影响**:在处理 1000 个并发流媒体连接时: - -```go -// bufio.Reader 方案 -// 每秒 1000 连接 × 30fps × 1024 字节/包 = 30,720,000 次分配 -// 每次分配 1024 字节 = 约 30GB/秒 的临时内存分配 -// 触发大量 GC - -// BufReader 方案 -// 0 次分配(内存复用) -// GC 压力降低 90%+ -// 系统稳定性显著提升 -``` - -**选择建议**: - -- 📁 **简单文件读取** → bufio.Reader -- 🔄 **高并发网络服务** → BufReader(GC 减少 98%) -- 💾 **长期运行服务** → BufReader(零分配) -- 🎯 **流媒体服务器** → BufReader(吞吐量提升 10-20x) - -## 4. 实际应用场景 - -### 4.1 RTSP 协议解析 - -```go -// 使用 BufReader 解析 RTSP 请求 -func parseRTSPRequest(conn net.Conn) (*RTSPRequest, error) { - reader := util.NewBufReader(conn) - defer reader.Recycle() - - // 读取请求行:零拷贝,无内存分配 - requestLine, err := reader.ReadLine() - if err != nil { - return nil, err - } - - // 读取头部:直接操作内存块 - headers, err := reader.ReadMIMEHeader() - if err != nil { - return nil, err - } - - // 读取 body(如果有) - if contentLength := headers.Get("Content-Length"); contentLength != "" { - length, _ := strconv.Atoi(contentLength) - // ReadRange 提供零拷贝的数据访问 - var body []byte - err = reader.ReadRange(length, func(chunk []byte) { - body = append(body, chunk...) - }) - } - - return &RTSPRequest{ - RequestLine: requestLine, - Headers: headers, - }, nil -} -``` - -### 4.2 流媒体数据包解析 - -```go -// 使用 BufReader 解析 FLV 数据包 -func parseFLVPackets(conn net.Conn) error { - reader := util.NewBufReader(conn) - defer reader.Recycle() - - for { - // 读取包头:4 字节 - packetType, err := reader.ReadByte() - if err != nil { - return err - } - - // 读取数据大小:3 字节大端序 - dataSize, err := reader.ReadBE32(3) - if err != nil { - return err - } - - // 读取时间戳:4 字节 - timestamp, err := reader.ReadBE32(4) - if err != nil { - return err - } - - // 跳过 StreamID:3 字节 - if err := reader.Skip(3); err != nil { - return err - } - - // 读取实际数据:零拷贝处理 - err = reader.ReadRange(int(dataSize), func(data []byte) { - // 直接处理数据,无需拷贝 - processPacket(packetType, timestamp, data) - }) - if err != nil { - return err - } - - // 跳过 previous tag size - if err := reader.Skip(4); err != nil { - return err - } - } -} -``` - -### 4.3 性能关键场景 - -BufReader 特别适合以下场景: - -1. **高频小包处理**:网络协议解析,RTP/RTCP 包处理 -2. **大数据流传输**:视频流、音频流的连续读取 -3. **协议多次读取**:需要分步骤读取不同长度数据的协议 -4. **低延迟要求**:实时流媒体传输,在线游戏 -5. **高并发场景**:大量并发连接的服务器 - -## 5. 最佳实践 - -### 5.1 正确使用模式 - -```go -// ✅ 正确:创建时指定合适的块大小 -func goodExample(conn net.Conn) { - // 根据实际数据包大小选择块大小 - reader := util.NewBufReaderWithBufLen(conn, 16384) // 16KB 块 - defer reader.Recycle() // 确保资源回收 - - // 使用 ReadRange 实现零拷贝 - reader.ReadRange(1024, func(data []byte) { - // 直接处理,不要持有 data 的引用 - process(data) - }) -} - -// ❌ 错误:忘记回收资源 -func badExample1(conn net.Conn) { - reader := util.NewBufReader(conn) - // 缺少 defer reader.Recycle() - // 导致内存块无法归还对象池 -} - -// ❌ 错误:持有数据引用 -var globalData []byte - -func badExample2(conn net.Conn) { - reader := util.NewBufReader(conn) - defer reader.Recycle() - - reader.ReadRange(1024, func(data []byte) { - // ❌ 错误:data 会在 Recycle 后被回收 - globalData = data // 悬空引用 - }) -} - -// ✅ 正确:需要保留数据时进行拷贝 -func goodExample2(conn net.Conn) { - reader := util.NewBufReader(conn) - defer reader.Recycle() - - var saved []byte - reader.ReadRange(1024, func(data []byte) { - // 需要保留时显式拷贝 - saved = make([]byte, len(data)) - copy(saved, data) - }) - // 现在可以安全使用 saved -} -``` - -### 5.2 块大小选择 - -```go -// 根据场景选择合适的块大小 -const ( - // 小包协议(如 RTSP, HTTP 头) - SmallPacketSize = 4 << 10 // 4KB - - // 中等数据流(如音频) - MediumPacketSize = 16 << 10 // 16KB - - // 大数据流(如视频) - LargePacketSize = 64 << 10 // 64KB -) - -func createReaderForProtocol(conn net.Conn, protocol string) *util.BufReader { - var bufSize int - switch protocol { - case "rtsp", "http": - bufSize = SmallPacketSize - case "audio": - bufSize = MediumPacketSize - case "video": - bufSize = LargePacketSize - default: - bufSize = util.defaultBufSize - } - return util.NewBufReaderWithBufLen(conn, bufSize) -} -``` - -### 5.3 错误处理 - -```go -func robustRead(conn net.Conn) error { - reader := util.NewBufReader(conn) - defer func() { - // 确保在任何情况下都回收资源 - reader.Recycle() - }() - - // 设置超时 - conn.SetReadDeadline(time.Now().Add(5 * time.Second)) - - // 读取数据 - data, err := reader.ReadBytes(1024) - if err != nil { - if err == io.EOF { - // 正常结束 - return nil - } - // 处理其他错误 - return fmt.Errorf("read error: %w", err) - } - - // 处理数据 - processData(data) - return nil -} -``` - -## 6. 性能优化技巧 - -### 6.1 批量处理 - -```go -// ✅ 优化:批量读取和处理 -func optimizedBatchRead(reader *util.BufReader) error { - // 一次性读取大块数据 - return reader.ReadRange(65536, func(chunk []byte) { - // 在回调中批量处理 - for len(chunk) > 0 { - packetSize := int(binary.BigEndian.Uint32(chunk[:4])) - packet := chunk[4 : 4+packetSize] - processPacket(packet) - chunk = chunk[4+packetSize:] - } - }) -} - -// ❌ 低效:逐个读取 -func inefficientRead(reader *util.BufReader) error { - for { - size, err := reader.ReadBE32(4) - if err != nil { - return err - } - packet, err := reader.ReadBytes(int(size)) - if err != nil { - return err - } - processPacket(packet.Buffers[0]) - } -} -``` - -### 6.2 避免不必要的拷贝 - -```go -// ✅ 优化:直接处理,无拷贝 -func zeroCopyProcess(reader *util.BufReader) error { - return reader.ReadRange(4096, func(data []byte) { - // 直接在原始内存上操作 - sum := 0 - for _, b := range data { - sum += int(b) - } - reportChecksum(sum) - }) -} - -// ❌ 低效:不必要的拷贝 -func unnecessaryCopy(reader *util.BufReader) error { - mem, err := reader.ReadBytes(4096) - if err != nil { - return err - } - // 又进行了一次拷贝 - data := make([]byte, mem.Size) - copy(data, mem.Buffers[0]) - - sum := 0 - for _, b := range data { - sum += int(b) - } - reportChecksum(sum) - return nil -} -``` - -### 6.3 合理的资源管理 - -```go -// ✅ 优化:使用对象池管理 BufReader -type ConnectionPool struct { - readers sync.Pool -} - -func (p *ConnectionPool) GetReader(conn net.Conn) *util.BufReader { - if reader := p.readers.Get(); reader != nil { - r := reader.(*util.BufReader) - // 重新初始化 - return r - } - return util.NewBufReader(conn) -} - -func (p *ConnectionPool) PutReader(reader *util.BufReader) { - reader.Recycle() // 回收内存块 - p.readers.Put(reader) // 回收 BufReader 对象本身 -} - -// 使用连接池 -func handleConnection(pool *ConnectionPool, conn net.Conn) { - reader := pool.GetReader(conn) - defer pool.PutReader(reader) - - // 处理连接 - processConnection(reader) -} -``` - -## 7. 总结 - -### 7.1 性能对比可视化 - -基于实际基准测试结果(并发场景): +#### 性能可视化 ``` -📊 GC 次数对比(核心优势)⭐⭐⭐ +📊 GC 次数对比(核心优势) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ bufio.Reader ████████████████████████████████████████████████████████████████ 134 次 BufReader █ 2 次 ← 减少 98.5%! -📊 内存分配总量对比 +📊 内存分配总量 ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ bufio.Reader ████████████████████████████████████████████████████████████████ 79 GB BufReader █ 0.6 GB ← 减少 99.2%! -📊 操作吞吐量对比 +📊 吞吐量对比 ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ bufio.Reader █████ 10.1M ops/s -BufReader ████████████████████████████████████████████████████████ 117M ops/s ← 11.6x! +BufReader ████████████████████████████████████████████████████████ 117M ops/s ``` -**关键指标**(流媒体服务器场景): -- 🎯 **GC 次数**:从 134 次降至 2 次(减少 98.5%) -- 💾 **内存分配**:从 79 GB 降至 0.6 GB(减少 132 倍) -- ⚡ **吞吐量**:提升 11.6 倍 +### 3.3 为什么非连续内存这么快? -### 7.2 核心优势 +**原因 1:零拷贝传递** +```go +// bufio - 必须拷贝 +buf := make([]byte, 1024) +reader.Read(buf) // 拷贝到连续内存 -BufReader 通过以下设计实现了零拷贝的高性能网络数据读取: +// BufReader - 传递引用 +reader.ReadRange(1024, func(chunk []byte) { + // chunk 是原始内存块,无拷贝 +}) +``` -1. **零拷贝架构** - - 数据直接从网络读取到最终内存位置 - - 使用切片视图避免数据拷贝 - - 链式缓冲区支持大块数据处理 +**原因 2:内存块复用** +``` +bufio: 分配 → 使用 → GC → 再分配 → ... +BufReader: 分配 → 使用 → 归还池 → 从池复用 → ... + ↑ 同一块内存反复使用,不触发 GC +``` -2. **内存复用机制** - - GoMem 对象池复用内存块 - - 主动内存管理减少 GC 压力 - - 可配置的块大小适应不同场景 +**原因 3:多订阅者共享** +``` +传统方式:1 个数据包 → 拷贝 10 份 → 10 个订阅者 +BufReader:1 个数据包 → 传递引用 → 10 个订阅者共享 + ↑ 只需 1 块内存,10 个订阅者都引用它 +``` -3. **性能提升显著**(在并发场景下) - - GC 次数减少 98.5%(134 → 2) - - 内存分配减少 99.2%(79 GB → 0.6 GB) - - 吞吐量提升 10-20 倍 - - 系统稳定性显著提升 +## 4. 使用指南 -### 7.3 适用场景 +### 4.1 基本使用 -BufReader 特别适合: +```go +func handleConnection(conn net.Conn) { + // 创建 BufReader + reader := util.NewBufReader(conn) + defer reader.Recycle() // 归还所有内存块到对象池 + + // 零拷贝读取和处理 + reader.ReadRange(4096, func(chunk []byte) { + // chunk 是非连续的内存块 + // 直接处理,无需拷贝 + processChunk(chunk) + }) +} +``` -- ✅ 高性能网络服务器 -- ✅ 流媒体数据处理 -- ✅ 实时协议解析 -- ✅ 大数据流传输 -- ✅ 低延迟要求场景 -- ✅ 高并发环境 +### 4.2 实际应用场景 -不适合: +**场景 1:协议解析** -- ❌ 简单的文件读取(标准库足够) -- ❌ 单次小数据读取 -- ❌ 不关心性能的场景 +```go +// 解析 FLV 数据包(header + data) +func parseFLV(reader *BufReader) { + // 读取包类型(1 字节) + packetType, _ := reader.ReadByte() + + // 读取数据大小(3 字节) + dataSize, _ := reader.ReadBE32(3) + + // 跳过时间戳等(7 字节) + reader.Skip(7) + + // 零拷贝读取数据(可能跨越多个非连续块) + reader.ReadRange(int(dataSize), func(chunk []byte) { + // chunk 可能是完整数据,也可能是其中一部分 + // 逐块解析,无需等待完整数据 + parseDataChunk(packetType, chunk) + }) +} +``` -### 7.4 与 bufio.Reader 的选择 +**场景 2:高并发转发** -| 场景 | 推荐使用 | -|------|---------| -| 简单文件读取 | bufio.Reader | -| 低频次网络读取 | bufio.Reader | -| 高性能网络服务器 | BufReader | -| 流媒体处理 | BufReader | -| 协议解析器 | BufReader | -| 需要零拷贝 | BufReader | -| 内存敏感场景 | BufReader | +```go +// 从一个源读取,转发给多个目标 +func relay(source *BufReader, targets []io.Writer) { + reader.ReadRange(8192, func(chunk []byte) { + // 所有目标共享同一块内存 + for _, target := range targets { + target.Write(chunk) // 零拷贝转发 + } + }) +} +``` -### 7.5 关键要点 +**场景 3:流媒体服务器** + +```go +// 接收 RTSP 流并分发给订阅者 +type Stream struct { + reader *BufReader + subscribers []*Subscriber +} + +func (s *Stream) Process() { + s.reader.ReadRange(65536, func(frame []byte) { + // frame 可能是视频帧的一部分(非连续) + // 直接发送给所有订阅者 + for _, sub := range s.subscribers { + sub.WriteFrame(frame) // 共享内存,零拷贝 + } + }) +} +``` + +### 4.3 最佳实践 + +**✅ 正确用法**: + +```go +// 1. 总是回收资源 +reader := util.NewBufReader(conn) +defer reader.Recycle() + +// 2. 在回调中直接处理,不要保存引用 +reader.ReadRange(1024, func(data []byte) { + processData(data) // ✅ 立即处理 +}) + +// 3. 需要保留时显式拷贝 +var saved []byte +reader.ReadRange(1024, func(data []byte) { + saved = append(saved, data...) // ✅ 显式拷贝 +}) +``` + +**❌ 错误用法**: + +```go +// ❌ 不要保存引用 +var dangling []byte +reader.ReadRange(1024, func(data []byte) { + dangling = data // 错误:data 会被回收 +}) +// dangling 现在是悬空引用! + +// ❌ 不要忘记回收 +reader := util.NewBufReader(conn) +// 缺少 defer reader.Recycle() +// 内存块无法归还对象池 +``` + +### 4.4 性能优化技巧 + +**技巧 1:批量处理** + +```go +// ✅ 优化:一次读取多个数据包 +reader.ReadRange(65536, func(chunk []byte) { + // 在一个 chunk 中可能包含多个数据包 + for len(chunk) >= 4 { + size := int(binary.BigEndian.Uint32(chunk[:4])) + packet := chunk[4 : 4+size] + processPacket(packet) + chunk = chunk[4+size:] + } +}) +``` + +**技巧 2:选择合适的块大小** + +```go +// 根据应用场景选择 +const ( + SmallPacket = 4 << 10 // 4KB - RTSP/HTTP + MediumPacket = 16 << 10 // 16KB - 音频流 + LargePacket = 64 << 10 // 64KB - 视频流 +) + +reader := util.NewBufReaderWithBufLen(conn, LargePacket) +``` + +## 5. 总结 + +### 核心创新:非连续内存缓冲 + +BufReader 的核心不是"更好的缓冲区",而是**彻底改变内存布局模型**: + +``` +传统思维:数据必须在连续内存中 +BufReader:数据可以分散在多个块中,通过引用传递 + +结果: +✓ 零拷贝:不需要重组成连续内存 +✓ 零分配:内存块从对象池复用 +✓ 零 GC 压力:不产生临时对象 +``` + +### 关键优势 + +| 特性 | 实现方式 | 性能影响 | +|------|---------|---------| +| **零拷贝** | 传递内存块引用 | 无拷贝开销 | +| **零分配** | 对象池复用 | GC 减少 98.5% | +| **多订阅者共享** | 同一块被多次引用 | 内存节省 10x+ | +| **灵活块大小** | 适应网络波动 | 无需重组 | + +### 适用场景 + +| 场景 | 推荐 | 原因 | +|------|------|------| +| **高并发网络服务器** | BufReader ⭐ | GC 减少 98%,吞吐量提升 10x+ | +| **流媒体转发** | BufReader ⭐ | 零拷贝多播,内存共享 | +| **协议解析器** | BufReader ⭐ | 逐块解析,无需完整包 | +| **长期运行服务** | BufReader ⭐ | 系统稳定,GC 影响极小 | +| 简单文件读取 | bufio.Reader | 标准库足够 | + +### 关键要点 使用 BufReader 时记住: -1. **始终调用 Recycle()**:确保内存块归还对象池 -2. **不要持有数据引用**:ReadRange 回调中的数据会被回收 -3. **选择合适的块大小**:根据实际数据包大小调整 -4. **利用 ReadRange**:实现真正的零拷贝处理 -5. **配合 GoMem 使用**:充分发挥内存复用优势 +1. **接受非连续数据**:通过回调处理每个块 +2. **不要持有引用**:数据在回调返回后会被回收 +3. **利用 ReadRange**:这是零拷贝的核心 API +4. **必须调用 Recycle()**:归还内存块到对象池 -通过 BufReader 和 GoMem 的配合,Monibuca 实现了高性能的网络数据处理,为流媒体服务器提供了坚实的基础设施支持。 +### 性能数据 + +**流媒体服务器(100 并发流,持续运行)**: + +``` +1 小时运行预估: + +bufio.Reader(连续内存): +- 分配 2.8 TB 内存 +- 触发 4,800 次 GC +- 系统频繁停顿 + +BufReader(非连续内存): +- 分配 21 GB 内存(减少 133x) +- 触发 72 次 GC(减少 67x) +- 系统几乎无 GC 影响 +``` + +### 测试和文档 + +**运行测试**: +```bash +sh scripts/benchmark_bufreader.sh +``` + +**详细文档**: +- 中文:`doc_CN/bufreader_analysis.md` +- English: `doc/bufreader_analysis.md` +- 非连续内存专题:`doc/bufreader_non_contiguous_buffer.md` ## 参考资料 -- [GoMem 项目](https://github.com/langhuihui/gomem) -- [Monibuca v5 文档](https://monibuca.com) -- [对象复用技术详解](./arch/reuse.md) -- Go 标准库 `bufio` 包源码 -- Go 标准库 `sync.Pool` 文档 +- [GoMem 项目](https://github.com/langhuihui/gomem) - 内存对象池实现 +- [Monibuca v5](https://m7s.live) - 流媒体服务器 +- 测试代码:`pkg/util/buf_reader_benchmark_test.go` +--- + +**核心思想**:通过非连续内存块链表和零拷贝引用传递,消除传统连续缓冲区的拷贝开销,实现高性能网络数据处理。 diff --git a/go.mod b/go.mod index 3ba187f..29a4f03 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,6 @@ require ( github.com/beevik/etree v1.4.1 github.com/bluenviron/gohlslib v1.4.0 github.com/c0deltin/duckdb-driver v0.1.0 - github.com/cilium/ebpf v0.15.0 github.com/cloudwego/goref v0.0.0-20240724113447-685d2a9523c8 github.com/deepch/vdk v0.0.27 github.com/disintegration/imaging v1.6.2 @@ -24,7 +23,7 @@ require ( github.com/icholy/digest v1.1.0 github.com/jinzhu/copier v0.4.0 github.com/kerberos-io/onvif v1.0.0 - github.com/langhuihui/gotask v1.0.0 + github.com/langhuihui/gotask v1.0.1 github.com/mark3labs/mcp-go v0.27.0 github.com/mattn/go-sqlite3 v1.14.24 github.com/mcuadros/go-defaults v1.2.0 @@ -44,7 +43,6 @@ require ( github.com/stretchr/testify v1.10.0 github.com/tencentyun/cos-go-sdk-v5 v0.7.69 github.com/valyala/fasthttp v1.61.0 - github.com/vishvananda/netlink v1.1.0 github.com/yapingcat/gomedia v0.0.0-20240601043430-920523f8e5c7 golang.org/x/image v0.22.0 golang.org/x/text v0.27.0 @@ -70,6 +68,7 @@ require ( github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/chromedp/cdproto v0.0.0-20240202021202-6d0b6a386732 // indirect github.com/chromedp/sysutil v1.0.0 // indirect + github.com/cilium/ebpf v0.15.0 // indirect github.com/clbanning/mxj v1.8.4 // indirect github.com/clbanning/mxj/v2 v2.7.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect @@ -128,7 +127,6 @@ require ( github.com/valyala/gozstd v1.21.1 // indirect github.com/valyala/histogram v1.2.0 // indirect github.com/valyala/quicktemplate v1.8.0 // indirect - github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df // indirect github.com/wlynxg/anet v0.0.5 // indirect github.com/yosida95/uritemplate/v3 v3.0.2 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect @@ -157,7 +155,7 @@ require ( golang.org/x/exp v0.0.0-20240716175740-e3f259677ff7 golang.org/x/mod v0.25.0 // indirect golang.org/x/net v0.41.0 - golang.org/x/sys v0.34.0 + golang.org/x/sys v0.34.0 // indirect golang.org/x/tools v0.34.0 // indirect gopkg.in/yaml.v3 v3.0.1 ) diff --git a/go.sum b/go.sum index 3a3d71c..ecee15c 100644 --- a/go.sum +++ b/go.sum @@ -160,8 +160,8 @@ github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0 github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/langhuihui/gomem v0.0.0-20251001011839-023923cf7683 h1:lITBgMb71ad6OUU9gycsheCw9PpMbXy3/QA8T0V0dVM= github.com/langhuihui/gomem v0.0.0-20251001011839-023923cf7683/go.mod h1:BTPq1+4YUP4i7w8VHzs5AUIdn3T5gXjIUXbxgHW9TIQ= -github.com/langhuihui/gotask v1.0.0 h1:UOs7IQQ5XVZyj1XRiHuHW2o/aQQV3s2LN/DiJyd6DPw= -github.com/langhuihui/gotask v1.0.0/go.mod h1:2zNqwV8M1pHoO0b5JC/A37oYpdtXrfL10Qof9AvR5IE= +github.com/langhuihui/gotask v1.0.1 h1:X+xETKZQ+OdRO8pNYudNdJH4yZ2QJM6ehHQVjw1i5RY= +github.com/langhuihui/gotask v1.0.1/go.mod h1:2zNqwV8M1pHoO0b5JC/A37oYpdtXrfL10Qof9AvR5IE= github.com/ledongthuc/pdf v0.0.0-20220302134840-0c2507a12d80 h1:6Yzfa6GP0rIo/kULo2bwGEkFvCePZ3qHDDTC3/J9Swo= github.com/ledongthuc/pdf v0.0.0-20220302134840-0c2507a12d80/go.mod h1:imJHygn/1yfhB7XSJJKlFZKl/J+dCPAknuiaGOshXAs= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4= @@ -312,10 +312,6 @@ github.com/valyala/histogram v1.2.0 h1:wyYGAZZt3CpwUiIb9AU/Zbllg1llXyrtApRS815OL github.com/valyala/histogram v1.2.0/go.mod h1:Hb4kBwb4UxsaNbbbh+RRz8ZR6pdodR57tzWUS3BUzXY= github.com/valyala/quicktemplate v1.8.0 h1:zU0tjbIqTRgKQzFY1L42zq0qR3eh4WoQQdIdqCysW5k= github.com/valyala/quicktemplate v1.8.0/go.mod h1:qIqW8/igXt8fdrUln5kOSb+KWMaJ4Y8QUsfd1k6L2jM= -github.com/vishvananda/netlink v1.1.0 h1:1iyaYNBLmP6L0220aDnYQpo1QEV4t4hJ+xEEhhJH8j0= -github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYppBueQtXaqoE= -github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df h1:OviZH7qLw/7ZovXvuNyL3XQl8UFofeikI1NW1Gypu7k= -github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU= github.com/wlynxg/anet v0.0.5 h1:J3VJGi1gvo0JwZ/P1/Yc/8p63SoW98B5dHkYDmpgvvU= github.com/wlynxg/anet v0.0.5/go.mod h1:eay5PRQr7fIVAMbTbchTnO9gG65Hg/uYGdc7mguHxoA= github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU= @@ -344,7 +340,6 @@ golang.org/x/net v0.41.0/go.mod h1:B/K4NNqkfmg07DQYrbwvSluqCJOOXwUjeb/5lOisjbA= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw= golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= -golang.org/x/sys v0.0.0-20190606203320-7fc4e5ec1444/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=