From 8b9075280d84fe26efd0af59e6b4643878b21b6e Mon Sep 17 00:00:00 2001 From: nabbar Date: Wed, 17 Dec 2025 09:09:55 +0100 Subject: [PATCH] Package IOutils/aggregator: - FIX: bugs with pipeline message - OPTIMZE: optimize calling function to reduce time to call custom function --- ioutils/aggregator/README.md | 11 ++-- ioutils/aggregator/TESTING.md | 28 ++++----- ioutils/aggregator/context.go | 12 +++- ioutils/aggregator/logger.go | 10 +-- ioutils/aggregator/model.go | 114 ++++++++++++++++++++-------------- ioutils/aggregator/runner.go | 30 +++++++-- ioutils/aggregator/writer.go | 16 ++++- 7 files changed, 140 insertions(+), 81 deletions(-) diff --git a/ioutils/aggregator/README.md b/ioutils/aggregator/README.md index b1275d8..2e9b28d 100644 --- a/ioutils/aggregator/README.md +++ b/ioutils/aggregator/README.md @@ -1,8 +1,8 @@ # IOUtils Aggregator -[![Go Version](https://img.shields.io/badge/Go-%3E%3D%201.19-blue)](https://go.dev/doc/install) +[![Go Version](https://img.shields.io/badge/Go-%3E%3D%201.18-blue)](https://go.dev/doc/install) [![License](https://img.shields.io/badge/License-MIT-green.svg)](../../../../LICENSE) -[![Coverage](https://img.shields.io/badge/Coverage-87.0%25-brightgreen)](TESTING.md) +[![Coverage](https://img.shields.io/badge/Coverage-85.7%25-brightgreen)](TESTING.md) Thread-safe write aggregator that serializes concurrent write operations to a single output function with optional periodic callbacks and real-time monitoring. @@ -309,6 +309,7 @@ package main import ( "context" "fmt" + "time" "github.com/nabbar/golib/ioutils/aggregator" ) @@ -464,7 +465,7 @@ go func() { ### Testing -The package includes a comprehensive test suite with **87.0% code coverage** and **119 test specifications** using BDD methodology (Ginkgo v2 + Gomega). +The package includes a comprehensive test suite with **85.7% code coverage** and **119 test specifications** using BDD methodology (Ginkgo v2 + Gomega). **Key test coverage:** - ✅ All public APIs and lifecycle operations @@ -788,7 +789,7 @@ The package is **production-ready** with no urgent improvements or security vuln ### Code Quality Metrics -- ✅ **87.0% test coverage** (target: >80%) +- ✅ **85.7% test coverage** (target: >80%) - ✅ **Zero race conditions** detected with `-race` flag - ✅ **Thread-safe** implementation using atomic operations - ✅ **Panic recovery** in all critical paths @@ -815,7 +816,7 @@ These are **optional improvements** and not required for production use. The cur - **[doc.go](doc.go)** - In-depth package documentation including design philosophy, architecture diagrams, buffer sizing formulas, and performance considerations. Provides detailed explanations of internal mechanisms and best practices for production use. -- **[TESTING.md](TESTING.md)** - Comprehensive test suite documentation covering test architecture, BDD methodology with Ginkgo v2, coverage analysis (87.0%), performance benchmarks, and guidelines for writing new tests. Includes troubleshooting and CI integration examples. +- **[TESTING.md](TESTING.md)** - Comprehensive test suite documentation covering test architecture, BDD methodology with Ginkgo v2, coverage analysis (85.7%), performance benchmarks, and guidelines for writing new tests. Includes troubleshooting and CI integration examples. ### Related golib Packages diff --git a/ioutils/aggregator/TESTING.md b/ioutils/aggregator/TESTING.md index 23df551..f9fcdab 100644 --- a/ioutils/aggregator/TESTING.md +++ b/ioutils/aggregator/TESTING.md @@ -1,10 +1,10 @@ # Testing Documentation [![License](https://img.shields.io/badge/License-MIT-green.svg)](../../../../LICENSE) -[![Go Version](https://img.shields.io/badge/Go-%3E%3D%201.19-blue)](https://go.dev/doc/install) +[![Go Version](https://img.shields.io/badge/Go-%3E%3D%201.18-blue)](https://go.dev/doc/install) [![Tests](https://img.shields.io/badge/Tests-119%20specs-success)](aggregator_suite_test.go) [![Assertions](https://img.shields.io/badge/Assertions-450+-blue)](aggregator_suite_test.go) -[![Coverage](https://img.shields.io/badge/Coverage-87.0%25-brightgreen)](coverage.out) +[![Coverage](https://img.shields.io/badge/Coverage-85.7%25-brightgreen)](coverage.out) Comprehensive testing guide for the `github.com/nabbar/golib/ioutils/aggregator` package using BDD methodology with Ginkgo v2 and Gomega. @@ -54,8 +54,8 @@ This test suite provides **comprehensive validation** of the `aggregator` packag ### Test Completeness **Coverage Metrics:** -- **Code Coverage**: 87.0% of statements (target: >80%) -- **Branch Coverage**: ~85% of conditional branches +- **Code Coverage**: 85.7% of statements (target: >80%) +- **Branch Coverage**: ~84% of conditional branches - **Function Coverage**: 100% of public functions - **Race Conditions**: 0 detected across all scenarios @@ -130,8 +130,8 @@ Passed: 119 Failed: 0 Skipped: 0 Execution Time: ~30 seconds -Coverage: 87.0% (standard) - 84.9% (with race detector) +Coverage: 85.7% (standard) + 85.7% (with race detector) Race Conditions: 0 ``` @@ -258,8 +258,8 @@ Ran 119 of 119 Specs in 29.096 seconds SUCCESS! -- 119 Passed | 0 Failed | 0 Pending | 0 Skipped PASS -coverage: 87.0% of statements -ok github.com/nabbar/golib/ioutils/aggregator 30.005s +coverage: 85.7% of statements +ok github.com/nabbar/golib/ioutils/aggregator 32.495s ``` --- @@ -270,11 +270,11 @@ ok github.com/nabbar/golib/ioutils/aggregator 30.005s | Component | File | Coverage | Critical Paths | |-----------|------|----------|----------------| -| **Interface** | interface.go | 95.8% | New(), error definitions | -| **Core Logic** | model.go | 96.3% | run(), metrics tracking | -| **Writer** | writer.go | 82.4% | Write(), channel management | -| **Runner** | runner.go | 89.5% | Start(), Stop(), lifecycle | -| **Context** | context.go | 66.7% | Context interface impl | +| **Interface** | interface.go | 94.1% | New(), error definitions | +| **Core Logic** | model.go | 94.8% | run(), metrics tracking | +| **Writer** | writer.go | 80.5% | Write(), channel management | +| **Runner** | runner.go | 87.8% | Start(), Stop(), lifecycle | +| **Context** | context.go | 64.9% | Context interface impl | | **Config** | config.go | 100% | Validation | | **Logger** | logger.go | 100% | Error logging | @@ -300,7 +300,7 @@ Uptime() 100.0% - Duration tracking ### Uncovered Code Analysis -**Uncovered Lines: 13.0% (target: <20%)** +**Uncovered Lines: 14.3% (target: <20%)** #### 1. Context Interface Implementation (context.go) diff --git a/ioutils/aggregator/context.go b/ioutils/aggregator/context.go index dfe687c..8b7cf48 100644 --- a/ioutils/aggregator/context.go +++ b/ioutils/aggregator/context.go @@ -108,7 +108,11 @@ func (o *agg) Value(key any) any { // cancel function for later use. If there was a previous context, its cancel // function is called to prevent resource leaks. func (o *agg) ctxNew(ctx context.Context) { - defer runner.RecoveryCaller("golib/ioutils/aggregator/ctxnew", recover()) + defer func() { + if r := recover(); r != nil { + runner.RecoveryCaller("golib/ioutils/aggregator/ctxnew", r) + } + }() if ctx == nil || ctx.Err() != nil { ctx = context.Background() @@ -129,7 +133,11 @@ func (o *agg) ctxNew(ctx context.Context) { // // The method is safe to call multiple times. func (o *agg) ctxClose() { - defer runner.RecoveryCaller("golib/ioutils/aggregator/ctxclose", recover()) + defer func() { + if r := recover(); r != nil { + runner.RecoveryCaller("golib/ioutils/aggregator/ctxclose", r) + } + }() // Cancel old context first and clear it atomically old := o.n.Swap(func() {}) diff --git a/ioutils/aggregator/logger.go b/ioutils/aggregator/logger.go index 74b8e03..6252e52 100644 --- a/ioutils/aggregator/logger.go +++ b/ioutils/aggregator/logger.go @@ -26,12 +26,14 @@ package aggregator // logError calls the configured error logger if set, otherwise does nothing. -func (o *agg) logError(msg string, err ...error) { - if i := o.le.Load(); i == nil { +func (o *agg) logError(msg string, err error) { + i := o.le.Load() + + if i == nil || err == nil { return - } else { - i(msg, err...) } + + i(msg, err) } // logInfo calls the configured info logger if set, otherwise does nothing. diff --git a/ioutils/aggregator/model.go b/ioutils/aggregator/model.go index 2e2f70e..69135ef 100644 --- a/ioutils/aggregator/model.go +++ b/ioutils/aggregator/model.go @@ -178,13 +178,21 @@ func (o *agg) SizeProcessing() int64 { // Returns: // - error: ErrStillRunning if already running, or the context error on cancellation func (o *agg) run(ctx context.Context) error { - defer runner.RecoveryCaller("golib/ioutils/aggregator/run", recover()) + defer func() { + if r := recover(); r != nil { + runner.RecoveryCaller("golib/ioutils/aggregator/run", r) + } + }() var ( sem libsem.Semaphore tckAsc = time.NewTicker(o.at) tckSnc = time.NewTicker(o.st) + + fctWrt func(p []byte) error + fctSyn func() + fctAsyn func(sem libsem.Semaphore) ) defer func() { @@ -199,6 +207,11 @@ func (o *agg) run(ctx context.Context) error { tckAsc.Stop() }() + // Check if function write is set + if o.fw == nil { + return ErrInvalidInstance + } + // Check if already running - prevent multi-start if o.op.Load() { return ErrStillRunning @@ -209,18 +222,33 @@ func (o *agg) run(ctx context.Context) error { o.chanOpen() o.cntReset() // Reset counters on start to ensure clean state + fctWrt = func(p []byte) error { + if len(p) < 1 { + return nil + } else { + o.mw.Lock() + defer o.mw.Unlock() + _, e := o.fw(p) + return e + } + } + + fctAsyn = o.callASyn() + fctSyn = o.callSyn() + sem = libsem.New(context.Background(), o.am, false) o.logInfo("starting aggregator") + for o.Err() == nil { select { case <-o.Done(): return o.Err() case <-tckAsc.C: - o.callASyn(sem) + fctAsyn(sem) case <-tckSnc.C: - o.callSyn() + fctSyn() case p, ok := <-o.chanData(): // Decrement counter immediately when data is received from channel @@ -228,9 +256,9 @@ func (o *agg) run(ctx context.Context) error { if !ok { // Channel closed, skip this iteration continue - } else if e := o.fctWrite(p); e != nil { + } else { // Log write errors but continue processing - o.logError("error writing data", e) + o.logError("error writing data", fctWrt(p)) } } } @@ -238,30 +266,6 @@ func (o *agg) run(ctx context.Context) error { return o.Err() } -// fctWrite calls the configured writer function with mutex protection. -// -// This ensures that Config.FctWriter is never called concurrently, even though -// multiple goroutines may be calling Write() simultaneously. -// -// Parameters: -// - p: Data to write -// -// Returns: -// - error: nil on success, ErrInvalidInstance if no writer configured, or writer error -func (o *agg) fctWrite(p []byte) error { - o.mw.Lock() - defer o.mw.Unlock() - - if len(p) < 1 { - return nil - } else if o.fw == nil { - return ErrInvalidInstance - } else { - _, e := o.fw(p) - return e - } -} - // callASyn invokes the async callback function if configured. // // The function is called in a new goroutine and is limited by the semaphore @@ -272,25 +276,32 @@ func (o *agg) fctWrite(p []byte) error { // // Parameters: // - sem: Semaphore for limiting concurrent workers -func (o *agg) callASyn(sem libsem.Semaphore) { - defer runner.RecoveryCaller("golib/ioutils/aggregator/callasyn", recover()) +func (o *agg) callASyn() func(sem libsem.Semaphore) { if !o.op.Load() { - return + return func(sem libsem.Semaphore) {} } else if o.af == nil { - return + return func(sem libsem.Semaphore) {} } else if o.x.Load() == nil { - return - } else if !sem.NewWorkerTry() { - // Semaphore full, skip this async call to avoid blocking - return - } else if e := sem.NewWorker(); e != nil { - o.logError("aggregator failed to start new async worker", e) - return - } else { + return func(sem libsem.Semaphore) {} + } + + return func(sem libsem.Semaphore) { + if !sem.NewWorkerTry() { + // Semaphore full, skip this async call to avoid blocking + return + } + // Launch async function in new goroutine go func() { + defer func() { + if r := recover(); r != nil { + runner.RecoveryCaller("golib/ioutils/aggregator/callasyn", r) + } + }() + defer sem.DeferWorker() + o.af(o.x.Load()) }() } @@ -300,17 +311,24 @@ func (o *agg) callASyn(sem libsem.Semaphore) { // // This function is called synchronously (blocking) on the timer tick. // It should complete quickly to avoid delaying write processing. -func (o *agg) callSyn() { - defer runner.RecoveryCaller("golib/ioutils/aggregator/callsyn", recover()) - +func (o *agg) callSyn() func() { if !o.op.Load() { - return + return func() {} } else if o.sf == nil { - return + return func() {} } else if o.x.Load() == nil { - return + return func() {} + } + + return func() { + defer func() { + if r := recover(); r != nil { + runner.RecoveryCaller("golib/ioutils/aggregator/callsyn", r) + } + }() + + o.sf(o.x.Load()) } - o.sf(o.x.Load()) } // cntDataInc increments the processing counters when data enters the buffer. diff --git a/ioutils/aggregator/runner.go b/ioutils/aggregator/runner.go index ff793bd..96729fc 100644 --- a/ioutils/aggregator/runner.go +++ b/ioutils/aggregator/runner.go @@ -63,7 +63,11 @@ import ( // Note: Start returns immediately. The processing goroutine starts asynchronously. // A small delay (10ms) is added to allow the goroutine to initialize before returning. func (o *agg) Start(ctx context.Context) error { - defer runner.RecoveryCaller("golib/ioutils/aggregator/start", recover()) + defer func() { + if r := recover(); r != nil { + runner.RecoveryCaller("golib/ioutils/aggregator/start", r) + } + }() r := o.getRunner() if r == nil { @@ -102,7 +106,11 @@ func (o *agg) Start(ctx context.Context) error { // log.Printf("stop failed: %v", err) // } func (o *agg) Stop(ctx context.Context) error { - defer runner.RecoveryCaller("golib/ioutils/aggregator/stop", recover()) + defer func() { + if r := recover(); r != nil { + runner.RecoveryCaller("golib/ioutils/aggregator/stop", r) + } + }() r := o.getRunner() if r == nil { @@ -138,7 +146,11 @@ func (o *agg) Stop(ctx context.Context) error { // log.Printf("restart failed: %v", err) // } func (o *agg) Restart(ctx context.Context) error { - defer runner.RecoveryCaller("golib/ioutils/aggregator/restart", recover()) + defer func() { + if r := recover(); r != nil { + runner.RecoveryCaller("golib/ioutils/aggregator/restart", r) + } + }() if e := o.Stop(ctx); e != nil { return e @@ -171,7 +183,11 @@ func (o *agg) Restart(ctx context.Context) error { // } // agg.Write(data) func (o *agg) IsRunning() bool { - defer runner.RecoveryCaller("golib/ioutils/aggregator/isrunning", recover()) + defer func() { + if r := recover(); r != nil { + runner.RecoveryCaller("golib/ioutils/aggregator/isrunning", r) + } + }() r := o.getRunner() @@ -223,7 +239,11 @@ func (o *agg) IsRunning() bool { // log.Printf("aggregator has been running for %v", uptime) // } func (o *agg) Uptime() time.Duration { - defer runner.RecoveryCaller("golib/ioutils/aggregator/uptime", recover()) + defer func() { + if r := recover(); r != nil { + runner.RecoveryCaller("golib/ioutils/aggregator/uptime", r) + } + }() r := o.getRunner() if r == nil { diff --git a/ioutils/aggregator/writer.go b/ioutils/aggregator/writer.go index 3e26dd3..59e242a 100644 --- a/ioutils/aggregator/writer.go +++ b/ioutils/aggregator/writer.go @@ -56,7 +56,11 @@ func (o *agg) Close() error { // closeRun is the internal close function called by the runner. // It stops the aggregator, closes the context, and closes the channel. func (o *agg) closeRun(ctx context.Context) error { - defer runner.RecoveryCaller("golib/ioutils/aggregator/close", recover()) + defer func() { + if r := recover(); r != nil { + runner.RecoveryCaller("golib/ioutils/aggregator/close", r) + } + }() var e error @@ -101,7 +105,11 @@ func (o *agg) closeRun(ctx context.Context) error { // // Note: The aggregator must be started with Start() before calling Write. func (o *agg) Write(p []byte) (n int, err error) { - defer runner.RecoveryCaller("golib/ioutils/aggregator/write", recover()) + defer func() { + if r := recover(); r != nil { + runner.RecoveryCaller("golib/ioutils/aggregator/write", r) + } + }() // Don't send empty data to channel n = len(p) @@ -125,8 +133,10 @@ func (o *agg) Write(p []byte) (n int, err error) { } else { // Increment processing counter before sending to channel o.cntDataInc(n) + // Send to channel (may block if buffer is full) - c <- p + // using new slice to prevent reset params slice p + c <- append(make([]byte, 0, n+1), p[:n]...) return len(p), nil } }