mirror of
https://github.com/nabbar/golib.git
synced 2025-12-24 11:51:02 +08:00
- FIX: potential CWE-400 with bufio.ReadBytes & bufio.ReadSlices, with no limited read buffer - ADD: test to check overflow buffer with discard or error - REFACTOR: all buffering package, parsing process - UPDATE: doc, examples, test following changes - OPTIMIZE: rework code to optimize process - REWORK: benchmark to check benefice of optimization - FIX: wording error Package IOUtils/Multi: - REWORK: re-design all package to allow sequential/parallel mode - UPDATE: package with adaptive mode to allow switch automaticly between sequential and parallel mode following measurment of sample - OPTIMIZE: code to maximize bandwith and reduce time of write - UPDATE: documentation, test and comments - REWORK: testing organization and benchmark aggregation Package HttpServer: - FIX: bug with dial addr rewrite for healtcheck & testing PortUse Package Logger/HookFile: - FIX: bug with race condition on aggregator counter file Other: - Bump dependencies - FIX: format / import file
272 lines
11 KiB
Go
272 lines
11 KiB
Go
/*
|
|
* MIT License
|
|
*
|
|
* Copyright (c) 2024 Nicolas JUHEL
|
|
*
|
|
* Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
* of this software and associated documentation files (the "Software"), to deal
|
|
* in the Software without restriction, including without limitation the rights
|
|
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
* copies of the Software, and to permit persons to whom the Software is
|
|
* furnished to do so, subject to the following conditions:
|
|
*
|
|
* The above copyright notice and this permission notice shall be included in all
|
|
* copies or substantial portions of the Software.
|
|
*
|
|
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
|
* SOFTWARE.
|
|
*
|
|
*
|
|
*/
|
|
|
|
package ioprogress
|
|
|
|
import (
|
|
"errors"
|
|
"io"
|
|
"sync/atomic"
|
|
|
|
libatm "github.com/nabbar/golib/atomic"
|
|
libfpg "github.com/nabbar/golib/file/progress"
|
|
)
|
|
|
|
// rdr implements the Reader interface by wrapping an io.ReadCloser with progress
|
|
// tracking capabilities using atomic operations for lock-free thread safety.
|
|
//
|
|
// Architecture: This structure uses composition to extend io.ReadCloser functionality
|
|
// while maintaining full compatibility with the standard io interfaces. All state
|
|
// mutations use atomic primitives to ensure thread-safe concurrent access without mutexes.
|
|
//
|
|
// Memory Layout:
|
|
// - r: 16 bytes (interface: pointer + type)
|
|
// - cr: 8 bytes (pointer to atomic.Int64)
|
|
// - fi, fe, fr: ~24 bytes each (atomic.Value with function pointer)
|
|
// Total: ~120 bytes per instance
|
|
//
|
|
// Thread Safety: All fields except 'r' use atomic operations. The underlying reader 'r'
|
|
// is accessed only through the Read() and Close() methods, and concurrent reads on the
|
|
// same reader should be managed by the caller according to io.Reader semantics.
|
|
type rdr struct {
|
|
r io.ReadCloser // underlying reader (caller manages concurrency)
|
|
cr *atomic.Int64 // cumulative byte counter (atomic updates)
|
|
fi libatm.Value[libfpg.FctIncrement] // increment callback (atomic load/store)
|
|
fe libatm.Value[libfpg.FctEOF] // EOF callback (atomic load/store)
|
|
fr libatm.Value[libfpg.FctReset] // reset callback (atomic load/store)
|
|
}
|
|
|
|
// Read implements io.Reader by delegating to the underlying reader and tracking progress.
|
|
//
|
|
// This method performs the following operations in order:
|
|
// 1. Delegate the read operation to the underlying reader
|
|
// 2. Update the cumulative byte counter atomically (even if err != nil)
|
|
// 3. Invoke the increment callback with bytes read
|
|
// 4. If EOF is detected, invoke the EOF callback
|
|
// 5. Return the original (n, err) from the underlying reader
|
|
//
|
|
// Behavior:
|
|
// - The increment callback is invoked for every Read() call, even when n=0 or err!=nil
|
|
// - EOF detection uses errors.Is() to handle wrapped errors correctly
|
|
// - The EOF callback is invoked only once per EOF encounter
|
|
// - All state updates use atomic operations for thread-safe concurrent reads
|
|
//
|
|
// Thread Safety: Multiple goroutines can call Read() concurrently if the underlying
|
|
// reader supports it. However, most io.Reader implementations are not safe for
|
|
// concurrent reads, so callers should serialize Read() calls unless documented otherwise.
|
|
//
|
|
// Performance: Adds ~50-100ns overhead per call due to atomic operations and callback
|
|
// invocation. Zero allocations in normal operation.
|
|
func (r *rdr) Read(p []byte) (n int, err error) {
|
|
n, err = r.r.Read(p)
|
|
r.inc(n) // Always track progress, even on error
|
|
|
|
if errors.Is(err, io.EOF) {
|
|
r.finish() // Invoke EOF callback
|
|
}
|
|
|
|
return n, err
|
|
}
|
|
|
|
// Close implements io.Closer by closing the underlying reader.
|
|
//
|
|
// This method propagates the close operation to the wrapped reader without
|
|
// modifying any progress tracking state. The cumulative counter and registered
|
|
// callbacks remain accessible after closing, though no further I/O should occur.
|
|
//
|
|
// Thread Safety: Safe to call concurrently, though behavior depends on the
|
|
// underlying reader's Close() implementation. Most readers are safe to close
|
|
// multiple times (idempotent).
|
|
//
|
|
// Typical Usage: Always defer Close() immediately after creating the wrapper:
|
|
//
|
|
// reader := ioprogress.NewReadCloser(file)
|
|
// defer reader.Close() // Closes both wrapper and underlying file
|
|
func (r *rdr) Close() error {
|
|
return r.r.Close()
|
|
}
|
|
|
|
// RegisterFctIncrement implements Progress by storing the increment callback.
|
|
//
|
|
// This method registers a callback that will be invoked after every Read() operation
|
|
// with the number of bytes transferred in that operation. The callback executes
|
|
// synchronously in the same goroutine as Read().
|
|
//
|
|
// Nil Handling: If fct is nil, it is converted to a no-op function before storage.
|
|
// This prevents atomic.Value.Store(nil) panic, as atomic.Value cannot store nil.
|
|
// This approach ensures the atomic field always contains a valid function pointer,
|
|
// eliminating the need for nil checks during Load() operations.
|
|
//
|
|
// Thread Safety: This method uses atomic.Value.Store() for lock-free concurrent access.
|
|
// It can be called safely from multiple goroutines, including during ongoing Read()
|
|
// operations. The new callback replaces any previously registered callback atomically.
|
|
//
|
|
// Performance: The callback must complete quickly (<1ms recommended) to avoid degrading
|
|
// I/O throughput. Use atomic operations for counter updates to maintain thread safety.
|
|
func (r *rdr) RegisterFctIncrement(fct libfpg.FctIncrement) {
|
|
if fct == nil {
|
|
// Convert nil to no-op to prevent atomic.Value.Store(nil) panic.
|
|
// This is more robust than checking for nil on every Load().
|
|
fct = func(size int64) {}
|
|
}
|
|
|
|
r.fi.Store(fct)
|
|
}
|
|
|
|
// RegisterFctReset implements Progress by storing the reset callback.
|
|
//
|
|
// This method registers a callback that will be invoked when Reset() is called,
|
|
// receiving the maximum expected size and current cumulative progress. Useful for
|
|
// multi-stage operations where progress needs to be reported relative to different
|
|
// total sizes or processing phases.
|
|
//
|
|
// Nil Handling: If fct is nil, it is converted to a no-op function before storage.
|
|
// This prevents atomic.Value.Store(nil) panic, ensuring robust operation without
|
|
// runtime nil pointer checks.
|
|
//
|
|
// Thread Safety: This method uses atomic.Value.Store() for lock-free concurrent access.
|
|
// Safe to call from multiple goroutines concurrently.
|
|
func (r *rdr) RegisterFctReset(fct libfpg.FctReset) {
|
|
if fct == nil {
|
|
// Convert nil to no-op to prevent atomic.Value.Store(nil) panic.
|
|
fct = func(size, current int64) {}
|
|
}
|
|
|
|
r.fr.Store(fct)
|
|
}
|
|
|
|
// RegisterFctEOF implements Progress by storing the EOF callback.
|
|
//
|
|
// This method registers a callback that will be invoked when io.EOF is detected
|
|
// during a Read() operation, indicating that all data has been consumed from the
|
|
// underlying reader. The callback executes immediately after EOF detection.
|
|
//
|
|
// Nil Handling: If fct is nil, it is converted to a no-op function before storage.
|
|
// This prevents atomic.Value.Store(nil) panic, maintaining robust operation.
|
|
//
|
|
// Thread Safety: This method uses atomic.Value.Store() for lock-free concurrent access.
|
|
// Safe to call from multiple goroutines concurrently.
|
|
func (r *rdr) RegisterFctEOF(fct libfpg.FctEOF) {
|
|
if fct == nil {
|
|
// Convert nil to no-op to prevent atomic.Value.Store(nil) panic.
|
|
fct = func() {}
|
|
}
|
|
|
|
r.fe.Store(fct)
|
|
}
|
|
|
|
// inc atomically increments the cumulative byte counter and invokes the increment callback.
|
|
//
|
|
// This internal method is called after every Read() operation to update progress tracking.
|
|
// It performs two atomic operations:
|
|
// 1. Add n to the cumulative counter using atomic.Int64.Add()
|
|
// 2. Load and invoke the increment callback using atomic.Value.Load()
|
|
//
|
|
// Behavior:
|
|
// - Always invoked by Read(), even when n=0 or an error occurred
|
|
// - The callback receives the incremental bytes (n), not the cumulative total
|
|
// - Nil check on r is defensive programming for safety, though should never be nil
|
|
// - No nil check on f because RegisterFct* methods ensure it's never nil
|
|
//
|
|
// Thread Safety: Uses atomic operations exclusively, safe for concurrent access.
|
|
//
|
|
// Performance: ~30-50ns overhead (atomic add + load + function call).
|
|
func (r *rdr) inc(n int) {
|
|
if r == nil {
|
|
// Defensive nil check, should never happen in normal operation
|
|
return
|
|
}
|
|
|
|
r.cr.Add(int64(n)) // Atomic increment of cumulative counter
|
|
|
|
f := r.fi.Load() // Atomic load of callback function
|
|
if f != nil {
|
|
f(int64(n)) // Invoke with incremental size, not cumulative
|
|
}
|
|
}
|
|
|
|
// finish invokes the EOF callback when EOF is detected.
|
|
//
|
|
// This internal method is called by Read() when errors.Is(err, io.EOF) returns true,
|
|
// indicating that the underlying reader has reached end-of-file. It provides a
|
|
// notification mechanism for completion detection.
|
|
//
|
|
// Behavior:
|
|
// - Called automatically by Read() upon EOF detection
|
|
// - May be invoked multiple times if Read() is called repeatedly after EOF
|
|
// - Uses atomic.Value.Load() to retrieve the callback safely
|
|
// - No nil check on f because RegisterFctEOF ensures it's never nil
|
|
//
|
|
// Thread Safety: Safe for concurrent access through atomic operations.
|
|
func (r *rdr) finish() {
|
|
if r == nil {
|
|
// Defensive nil check
|
|
return
|
|
}
|
|
|
|
f := r.fe.Load() // Atomic load of EOF callback
|
|
if f != nil {
|
|
f() // Invoke EOF notification
|
|
}
|
|
}
|
|
|
|
// Reset implements Progress by invoking the reset callback with the maximum size
|
|
// and current progress.
|
|
//
|
|
// This method provides a way to report progress relative to a known total size or
|
|
// to signal stage transitions in multi-phase operations. It does NOT reset the
|
|
// cumulative counter; it only invokes the callback with current values.
|
|
//
|
|
// Parameters:
|
|
// - max: The maximum expected size in bytes, or 0 if unknown
|
|
//
|
|
// Behavior:
|
|
// - Loads the current cumulative count atomically
|
|
// - Invokes the reset callback with (max, current)
|
|
// - The cumulative counter remains unchanged
|
|
// - Useful for updating progress bars or reporting stage completion
|
|
//
|
|
// Thread Safety: Safe for concurrent access. Uses atomic operations for both
|
|
// callback retrieval (atomic.Value.Load) and counter reading (atomic.Int64.Load).
|
|
//
|
|
// Example Use Case:
|
|
//
|
|
// reader.Reset(fileSize) // Update progress bar with total size
|
|
// validateData(reader) // First pass
|
|
// reader.Reset(fileSize) // Update for second pass
|
|
// processData(reader) // Second pass
|
|
func (r *rdr) Reset(max int64) {
|
|
if r == nil {
|
|
// Defensive nil check
|
|
return
|
|
}
|
|
|
|
f := r.fr.Load() // Atomic load of reset callback
|
|
if f != nil {
|
|
f(max, r.cr.Load()) // Invoke with max and current cumulative count
|
|
}
|
|
}
|