Files
golib/ioutils/ioprogress/writer.go
nabbar 96ed6f9a1f [Package IOUtils/Delim]
- FIX: potential CWE-400 with bufio.ReadBytes & bufio.ReadSlices, with
  no limited read buffer
- ADD: test to check overflow buffer with discard or error
- REFACTOR: all buffering package, parsing process
- UPDATE: doc, examples, test following changes
- OPTIMIZE: rework code to optimize process
- REWORK: benchmark to check benefice of optimization
- FIX: wording error

Package IOUtils/Multi:
- REWORK: re-design all package to allow sequential/parallel mode
- UPDATE: package with adaptive mode to allow switch automaticly between
  sequential and parallel mode following measurment of sample
- OPTIMIZE: code to maximize bandwith and reduce time of write
- UPDATE: documentation, test and comments
- REWORK: testing organization and benchmark aggregation

Package HttpServer:
- FIX: bug with dial addr rewrite for healtcheck & testing PortUse

Package Logger/HookFile:
- FIX: bug with race condition on aggregator counter file

Other:
- Bump dependencies
- FIX: format / import file
2025-12-21 16:56:13 +01:00

274 lines
11 KiB
Go

/*
* MIT License
*
* Copyright (c) 2024 Nicolas JUHEL
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*
*/
package ioprogress
import (
"errors"
"io"
"sync/atomic"
libatm "github.com/nabbar/golib/atomic"
libfpg "github.com/nabbar/golib/file/progress"
)
// wrt implements the Writer interface by wrapping an io.WriteCloser with progress
// tracking capabilities using atomic operations for lock-free thread safety.
//
// Architecture: This structure mirrors the rdr implementation, using composition to
// extend io.WriteCloser functionality while maintaining full compatibility with standard
// io interfaces. All state mutations use atomic primitives for thread-safe concurrent access.
//
// Memory Layout:
// - w: 16 bytes (interface: pointer + type)
// - cr: 8 bytes (pointer to atomic.Int64)
// - fi, fe, fr: ~24 bytes each (atomic.Value with function pointer)
// Total: ~120 bytes per instance
//
// Thread Safety: All fields except 'w' use atomic operations. The underlying writer 'w'
// is accessed only through Write() and Close() methods. Concurrent writes should be
// managed by the caller according to io.Writer semantics.
type wrt struct {
w io.WriteCloser // underlying writer (caller manages concurrency)
cr *atomic.Int64 // cumulative byte counter (atomic updates)
fi libatm.Value[libfpg.FctIncrement] // increment callback (atomic load/store)
fe libatm.Value[libfpg.FctEOF] // EOF callback (atomic load/store)
fr libatm.Value[libfpg.FctReset] // reset callback (atomic load/store)
}
// Write implements io.Writer by delegating to the underlying writer and tracking progress.
//
// This method performs the following operations in order:
// 1. Delegate the write operation to the underlying writer
// 2. Update the cumulative byte counter atomically (even if err != nil)
// 3. Invoke the increment callback with bytes written
// 4. If EOF is detected (rare for writes), invoke the EOF callback
// 5. Return the original (n, err) from the underlying writer
//
// Behavior:
// - The increment callback is invoked for every Write() call, even when n=0 or err!=nil
// - EOF detection uses errors.Is() to handle wrapped errors, though EOF is uncommon for writers
// - All state updates use atomic operations for thread-safe concurrent writes
//
// Thread Safety: Multiple goroutines can call Write() concurrently if the underlying
// writer supports it. However, most io.Writer implementations are not safe for concurrent
// writes, so callers should serialize Write() calls unless documented otherwise.
//
// Performance: Adds ~50-100ns overhead per call due to atomic operations and callback
// invocation. Zero allocations in normal operation.
func (w *wrt) Write(p []byte) (n int, err error) {
n, err = w.w.Write(p)
w.inc(n) // Always track progress, even on error
if errors.Is(err, io.EOF) {
w.finish() // Invoke EOF callback (rare for writes)
}
return n, err
}
// Close implements io.Closer by closing the underlying writer.
//
// This method propagates the close operation to the wrapped writer without
// modifying any progress tracking state. The cumulative counter and registered
// callbacks remain accessible after closing, though no further I/O should occur.
//
// Thread Safety: Safe to call concurrently, though behavior depends on the
// underlying writer's Close() implementation. Most writers are safe to close
// multiple times (idempotent).
//
// Typical Usage: Always defer Close() immediately after creating the wrapper:
//
// writer := ioprogress.NewWriteCloser(file)
// defer writer.Close() // Closes both wrapper and underlying file
func (w *wrt) Close() error {
return w.w.Close()
}
// RegisterFctIncrement implements Progress by storing the increment callback.
//
// This method registers a callback that will be invoked after every Write() operation
// with the number of bytes transferred in that operation. The callback executes
// synchronously in the same goroutine as Write().
//
// Nil Handling: If fct is nil, it is converted to a no-op function before storage.
// This prevents atomic.Value.Store(nil) panic, as atomic.Value cannot store nil.
// This approach ensures the atomic field always contains a valid function pointer,
// eliminating the need for nil checks during Load() operations.
//
// Thread Safety: This method uses atomic.Value.Store() for lock-free concurrent access.
// It can be called safely from multiple goroutines, including during ongoing Write()
// operations. The new callback replaces any previously registered callback atomically.
//
// Performance: The callback must complete quickly (<1ms recommended) to avoid degrading
// I/O throughput. Use atomic operations for counter updates to maintain thread safety.
func (w *wrt) RegisterFctIncrement(fct libfpg.FctIncrement) {
if fct == nil {
// Convert nil to no-op to prevent atomic.Value.Store(nil) panic.
// This is more robust than checking for nil on every Load().
fct = func(size int64) {}
}
w.fi.Store(fct)
}
// RegisterFctReset implements Progress by storing the reset callback.
//
// This method registers a callback that will be invoked when Reset() is called,
// receiving the maximum expected size and current cumulative progress. Useful for
// multi-stage operations where progress needs to be reported relative to different
// total sizes or processing phases.
//
// Nil Handling: If fct is nil, it is converted to a no-op function before storage.
// This prevents atomic.Value.Store(nil) panic, ensuring robust operation without
// runtime nil pointer checks.
//
// Thread Safety: This method uses atomic.Value.Store() for lock-free concurrent access.
// Safe to call from multiple goroutines concurrently.
func (w *wrt) RegisterFctReset(fct libfpg.FctReset) {
if fct == nil {
// Convert nil to no-op to prevent atomic.Value.Store(nil) panic.
fct = func(size, current int64) {}
}
w.fr.Store(fct)
}
// RegisterFctEOF implements Progress by storing the EOF callback.
//
// This method registers a callback that will be invoked when io.EOF is detected during
// a Write() operation. Note that EOF is rare for write operations and this callback
// exists primarily for API completeness and symmetry with Reader.
//
// Nil Handling: If fct is nil, it is converted to a no-op function before storage.
// This prevents atomic.Value.Store(nil) panic, maintaining robust operation.
//
// Thread Safety: This method uses atomic.Value.Store() for lock-free concurrent access.
// Safe to call from multiple goroutines concurrently.
func (w *wrt) RegisterFctEOF(fct libfpg.FctEOF) {
if fct == nil {
// Convert nil to no-op to prevent atomic.Value.Store(nil) panic.
fct = func() {}
}
w.fe.Store(fct)
}
// inc atomically increments the cumulative byte counter and invokes the increment callback.
//
// This internal method is called after every Write() operation to update progress tracking.
// It performs two atomic operations:
// 1. Add n to the cumulative counter using atomic.Int64.Add()
// 2. Load and invoke the increment callback using atomic.Value.Load()
//
// Behavior:
// - Always invoked by Write(), even when n=0 or an error occurred
// - The callback receives the incremental bytes (n), not the cumulative total
// - Nil check on w is defensive programming for safety, though should never be nil
// - No nil check on f because RegisterFct* methods ensure it's never nil
//
// Thread Safety: Uses atomic operations exclusively, safe for concurrent access.
//
// Performance: ~30-50ns overhead (atomic add + load + function call).
func (w *wrt) inc(n int) {
if w == nil {
// Defensive nil check, should never happen in normal operation
return
}
w.cr.Add(int64(n)) // Atomic increment of cumulative counter
f := w.fi.Load() // Atomic load of callback function
if f != nil {
f(int64(n)) // Invoke with incremental size, not cumulative
}
}
// finish invokes the EOF callback when EOF is detected (rare for writers).
//
// This internal method is called by Write() when errors.Is(err, io.EOF) returns true.
// While EOF is uncommon for write operations, this method exists for completeness and
// symmetry with the Reader implementation.
//
// Behavior:
// - Called automatically by Write() upon EOF detection
// - Rarely invoked in practice (EOF is uncommon for writers)
// - Uses atomic.Value.Load() to retrieve the callback safely
// - No nil check on f because RegisterFctEOF ensures it's never nil
//
// Thread Safety: Safe for concurrent access through atomic operations.
//
// Note: This code path has lower test coverage (~80%) because triggering EOF on
// a writer is rare in typical scenarios.
func (w *wrt) finish() {
if w == nil {
// Defensive nil check
return
}
f := w.fe.Load() // Atomic load of EOF callback
if f != nil {
f() // Invoke EOF notification
}
}
// Reset implements Progress by invoking the reset callback with the maximum size
// and current progress.
//
// This method provides a way to report progress relative to a known total size or
// to signal stage transitions in multi-phase operations. It does NOT reset the
// cumulative counter; it only invokes the callback with current values.
//
// Parameters:
// - max: The maximum expected size in bytes, or 0 if unknown
//
// Behavior:
// - Loads the current cumulative count atomically
// - Invokes the reset callback with (max, current)
// - The cumulative counter remains unchanged
// - Useful for updating progress bars or reporting stage completion
//
// Thread Safety: Safe for concurrent access. Uses atomic operations for both
// callback retrieval (atomic.Value.Load) and counter reading (atomic.Int64.Load).
//
// Example Use Case:
//
// writer.Reset(totalSize) // Update progress bar with total size
// writePhase1(writer) // First write phase
// writer.Reset(totalSize) // Update for second phase
// writePhase2(writer) // Second write phase
func (w *wrt) Reset(max int64) {
if w == nil {
// Defensive nil check
return
}
f := w.fr.Load() // Atomic load of reset callback
if f != nil {
f(max, w.cr.Load()) // Invoke with max and current cumulative count
}
}