/* * MIT License * * Copyright (c) 2024 Nicolas JUHEL * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal * in the Software without restriction, including without limitation the rights * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell * copies of the Software, and to permit persons to whom the Software is * furnished to do so, subject to the following conditions: * * The above copyright notice and this permission notice shall be included in all * copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * SOFTWARE. * * */ package ioprogress import ( "io" "sync/atomic" libatm "github.com/nabbar/golib/atomic" libfpg "github.com/nabbar/golib/file/progress" ) // Progress defines the interface for I/O progress tracking with customizable callbacks. // // This interface provides methods to register callbacks that will be invoked during I/O // operations, enabling real-time monitoring of data transfer progress. All callbacks are // executed synchronously in the caller's goroutine and must complete quickly to avoid // impacting I/O performance. // // Thread Safety: All methods are thread-safe and can be called concurrently from multiple // goroutines. Callback registration uses atomic operations to ensure safe concurrent access. type Progress interface { // RegisterFctIncrement registers a callback function invoked after each I/O operation. // // The callback receives the number of bytes transferred in the current operation as its // argument. This allows tracking of incremental progress on every Read() or Write() call. // // Nil Safety: If fct is nil, it will be converted to a no-op function to prevent panics // when using atomic.Value. This ensures atomic operations remain valid without nil checks. // // Execution Context: The callback executes synchronously in the same goroutine performing // the I/O operation. It will be invoked even if the Read/Write operation returns an error, // ensuring consistent progress tracking. // // Performance: The callback must not block and should complete in <1ms to avoid degrading // I/O throughput. Use atomic operations for counter updates to maintain thread safety. // // Thread Safety: This method can be called concurrently with ongoing I/O operations. // The new callback replaces any previously registered callback atomically. RegisterFctIncrement(fct libfpg.FctIncrement) // RegisterFctReset registers a callback function invoked when Reset() is called. // // The callback receives two arguments: // - max: The maximum expected size (total bytes), or 0 if unknown // - current: The cumulative bytes processed since the last reset // // Use Case: Multi-stage operations where progress needs to be reported relative to // different stages or total sizes. For example, processing multiple files or phases. // // Nil Safety: If fct is nil, it will be converted to a no-op function to prevent panics // when using atomic.Value, ensuring safe atomic storage without nil pointer issues. // // Execution Context: The callback executes synchronously when Reset() is called, in the // same goroutine. It provides a snapshot of the current cumulative progress. // // Thread Safety: This method can be called concurrently. The new callback replaces any // previously registered callback atomically. RegisterFctReset(fct libfpg.FctReset) // RegisterFctEOF registers a callback function invoked when EOF is encountered. // // The callback takes no arguments and is triggered when io.EOF is detected during // a Read() or Write() operation, indicating completion of data transfer. // // Reader Behavior: For readers, this callback is reliably invoked when the underlying // reader returns io.EOF, typically after all data has been consumed. // // Writer Behavior: For writers, EOF is rare and this callback may not be invoked in // typical scenarios. It exists for completeness but has lower test coverage. // // Nil Safety: If fct is nil, it will be converted to a no-op function to prevent panics // when using atomic.Value. This ensures atomic.Value.Store() never receives nil. // // Execution Context: The callback executes synchronously immediately after EOF detection, // in the same goroutine as the I/O operation. // // Thread Safety: This method can be called concurrently. The new callback replaces any // previously registered callback atomically. RegisterFctEOF(fct libfpg.FctEOF) // Reset invokes the reset callback with the specified maximum size and current progress. // // This method provides a way to update progress tracking for multi-stage operations or // to report progress relative to a known total size. // // Parameters: // - max: The maximum expected size in bytes, or 0 if the total size is unknown // // Behavior: Invokes the currently registered reset callback with max and the current // cumulative byte count. The cumulative counter is NOT reset by this call. // // Execution Context: The reset callback executes synchronously in the caller's goroutine. // // Thread Safety: This method is thread-safe and can be called concurrently with I/O // operations. The callback retrieval uses atomic.Value.Load() for safe concurrent access. Reset(max int64) } // Reader extends io.ReadCloser with progress tracking capabilities. // // This interface combines standard Go I/O operations with real-time progress monitoring, // allowing applications to track bytes read while maintaining full compatibility with // io.ReadCloser-based code. // // Thread Safety: All operations are thread-safe. The underlying byte counter uses atomic.Int64 // and callback storage uses atomic.Value, ensuring lock-free concurrent access. // // Usage: Wrap any io.ReadCloser with NewReadCloser() to obtain a Reader with progress tracking. type Reader interface { io.ReadCloser Progress } // Writer extends io.WriteCloser with progress tracking capabilities. // // This interface combines standard Go I/O operations with real-time progress monitoring, // allowing applications to track bytes written while maintaining full compatibility with // io.WriteCloser-based code. // // Thread Safety: All operations are thread-safe. The underlying byte counter uses atomic.Int64 // and callback storage uses atomic.Value, ensuring lock-free concurrent access. // // Usage: Wrap any io.WriteCloser with NewWriteCloser() to obtain a Writer with progress tracking. type Writer interface { io.WriteCloser Progress } // NewReadCloser wraps an io.ReadCloser with progress tracking capabilities. // // This function creates a transparent wrapper that monitors all Read() operations and // maintains a cumulative byte counter. It returns a Reader interface that implements // both io.ReadCloser and Progress, enabling real-time progress monitoring without // modifying the underlying reader's behavior. // // Parameters: // - r: The underlying io.ReadCloser to wrap. Must not be nil. // // Returns: // - Reader: A thread-safe wrapper that tracks read operations // // Behavior: // - All Read() operations are delegated to the underlying reader // - Cumulative byte counter is updated atomically after each read // - Registered callbacks are invoked synchronously after each read // - EOF detection triggers the EOF callback if registered // - Close() propagates to the underlying reader // // Thread Safety: The returned Reader is safe for concurrent use. Multiple goroutines // can call Read() simultaneously, and callbacks can be registered/updated during // ongoing I/O operations. All state updates use atomic operations (atomic.Int64 for // counters, atomic.Value for callbacks). // // Lifecycle: The wrapper remains valid until Close() is called. After closing, the // wrapper and underlying reader are both closed and should not be used further. // // Memory Overhead: ~120 bytes (struct size + callback storage) // // Performance Impact: <100ns per Read() call (~4-5% overhead for typical I/O) // // Example: // // file, _ := os.Open("data.bin") // reader := ioprogress.NewReadCloser(file) // defer reader.Close() // Closes both wrapper and file // // var total int64 // reader.RegisterFctIncrement(func(n int64) { // atomic.AddInt64(&total, n) // fmt.Printf("\rRead: %d bytes", atomic.LoadInt64(&total)) // }) // // io.Copy(io.Discard, reader) func NewReadCloser(r io.ReadCloser) Reader { o := &rdr{ r: r, cr: new(atomic.Int64), fi: libatm.NewValue[libfpg.FctIncrement](), fe: libatm.NewValue[libfpg.FctEOF](), fr: libatm.NewValue[libfpg.FctReset](), } // Initialize callbacks with nil to prevent atomic.Value panic. // Nil callbacks are converted to no-op functions by RegisterFct* methods. // This ensures atomic.Value.Store() always receives a valid function pointer, // as atomic.Value.Store(nil) would panic. This initialization also validates // that all atomic.Value fields are properly set before any concurrent access. o.RegisterFctIncrement(nil) o.RegisterFctEOF(nil) o.RegisterFctReset(nil) return o } // NewWriteCloser wraps an io.WriteCloser with progress tracking capabilities. // // This function creates a transparent wrapper that monitors all Write() operations and // maintains a cumulative byte counter. It returns a Writer interface that implements // both io.WriteCloser and Progress, enabling real-time progress monitoring without // modifying the underlying writer's behavior. // // Parameters: // - w: The underlying io.WriteCloser to wrap. Must not be nil. // // Returns: // - Writer: A thread-safe wrapper that tracks write operations // // Behavior: // - All Write() operations are delegated to the underlying writer // - Cumulative byte counter is updated atomically after each write // - Registered callbacks are invoked synchronously after each write // - EOF detection triggers the EOF callback if registered (rare for writers) // - Close() propagates to the underlying writer // // Thread Safety: The returned Writer is safe for concurrent use. Multiple goroutines // can call Write() simultaneously, and callbacks can be registered/updated during // ongoing I/O operations. All state updates use atomic operations (atomic.Int64 for // counters, atomic.Value for callbacks). // // Lifecycle: The wrapper remains valid until Close() is called. After closing, the // wrapper and underlying writer are both closed and should not be used further. // // Memory Overhead: ~120 bytes (struct size + callback storage) // // Performance Impact: <100ns per Write() call (~4-5% overhead for typical I/O) // // Example: // // file, _ := os.Create("output.bin") // writer := ioprogress.NewWriteCloser(file) // defer writer.Close() // Closes both wrapper and file // // var total int64 // writer.RegisterFctIncrement(func(n int64) { // atomic.AddInt64(&total, n) // fmt.Printf("\rWritten: %d bytes", atomic.LoadInt64(&total)) // }) // // io.Copy(writer, source) func NewWriteCloser(w io.WriteCloser) Writer { o := &wrt{ w: w, cr: new(atomic.Int64), fi: libatm.NewValue[libfpg.FctIncrement](), fe: libatm.NewValue[libfpg.FctEOF](), fr: libatm.NewValue[libfpg.FctReset](), } // Initialize callbacks with nil to prevent atomic.Value panic. // Nil callbacks are converted to no-op functions by RegisterFct* methods. // This ensures atomic.Value.Store() always receives a valid function pointer, // as atomic.Value.Store(nil) would panic. This initialization also validates // that all atomic.Value fields are properly set before any concurrent access. o.RegisterFctIncrement(nil) o.RegisterFctEOF(nil) o.RegisterFctReset(nil) return o }