Package IOutils/aggregator:

- FIX: bugs with pipeline message
- OPTIMZE: optimize calling function to reduce time to call custom
  function
This commit is contained in:
nabbar
2025-12-17 09:09:55 +01:00
parent c16f86b9e6
commit 8b9075280d
7 changed files with 140 additions and 81 deletions

View File

@@ -1,8 +1,8 @@
# IOUtils Aggregator
[![Go Version](https://img.shields.io/badge/Go-%3E%3D%201.19-blue)](https://go.dev/doc/install)
[![Go Version](https://img.shields.io/badge/Go-%3E%3D%201.18-blue)](https://go.dev/doc/install)
[![License](https://img.shields.io/badge/License-MIT-green.svg)](../../../../LICENSE)
[![Coverage](https://img.shields.io/badge/Coverage-87.0%25-brightgreen)](TESTING.md)
[![Coverage](https://img.shields.io/badge/Coverage-85.7%25-brightgreen)](TESTING.md)
Thread-safe write aggregator that serializes concurrent write operations to a single output function with optional periodic callbacks and real-time monitoring.
@@ -309,6 +309,7 @@ package main
import (
"context"
"fmt"
"time"
"github.com/nabbar/golib/ioutils/aggregator"
)
@@ -464,7 +465,7 @@ go func() {
### Testing
The package includes a comprehensive test suite with **87.0% code coverage** and **119 test specifications** using BDD methodology (Ginkgo v2 + Gomega).
The package includes a comprehensive test suite with **85.7% code coverage** and **119 test specifications** using BDD methodology (Ginkgo v2 + Gomega).
**Key test coverage:**
- ✅ All public APIs and lifecycle operations
@@ -788,7 +789,7 @@ The package is **production-ready** with no urgent improvements or security vuln
### Code Quality Metrics
-**87.0% test coverage** (target: >80%)
-**85.7% test coverage** (target: >80%)
-**Zero race conditions** detected with `-race` flag
-**Thread-safe** implementation using atomic operations
-**Panic recovery** in all critical paths
@@ -815,7 +816,7 @@ These are **optional improvements** and not required for production use. The cur
- **[doc.go](doc.go)** - In-depth package documentation including design philosophy, architecture diagrams, buffer sizing formulas, and performance considerations. Provides detailed explanations of internal mechanisms and best practices for production use.
- **[TESTING.md](TESTING.md)** - Comprehensive test suite documentation covering test architecture, BDD methodology with Ginkgo v2, coverage analysis (87.0%), performance benchmarks, and guidelines for writing new tests. Includes troubleshooting and CI integration examples.
- **[TESTING.md](TESTING.md)** - Comprehensive test suite documentation covering test architecture, BDD methodology with Ginkgo v2, coverage analysis (85.7%), performance benchmarks, and guidelines for writing new tests. Includes troubleshooting and CI integration examples.
### Related golib Packages

View File

@@ -1,10 +1,10 @@
# Testing Documentation
[![License](https://img.shields.io/badge/License-MIT-green.svg)](../../../../LICENSE)
[![Go Version](https://img.shields.io/badge/Go-%3E%3D%201.19-blue)](https://go.dev/doc/install)
[![Go Version](https://img.shields.io/badge/Go-%3E%3D%201.18-blue)](https://go.dev/doc/install)
[![Tests](https://img.shields.io/badge/Tests-119%20specs-success)](aggregator_suite_test.go)
[![Assertions](https://img.shields.io/badge/Assertions-450+-blue)](aggregator_suite_test.go)
[![Coverage](https://img.shields.io/badge/Coverage-87.0%25-brightgreen)](coverage.out)
[![Coverage](https://img.shields.io/badge/Coverage-85.7%25-brightgreen)](coverage.out)
Comprehensive testing guide for the `github.com/nabbar/golib/ioutils/aggregator` package using BDD methodology with Ginkgo v2 and Gomega.
@@ -54,8 +54,8 @@ This test suite provides **comprehensive validation** of the `aggregator` packag
### Test Completeness
**Coverage Metrics:**
- **Code Coverage**: 87.0% of statements (target: >80%)
- **Branch Coverage**: ~85% of conditional branches
- **Code Coverage**: 85.7% of statements (target: >80%)
- **Branch Coverage**: ~84% of conditional branches
- **Function Coverage**: 100% of public functions
- **Race Conditions**: 0 detected across all scenarios
@@ -130,8 +130,8 @@ Passed: 119
Failed: 0
Skipped: 0
Execution Time: ~30 seconds
Coverage: 87.0% (standard)
84.9% (with race detector)
Coverage: 85.7% (standard)
85.7% (with race detector)
Race Conditions: 0
```
@@ -258,8 +258,8 @@ Ran 119 of 119 Specs in 29.096 seconds
SUCCESS! -- 119 Passed | 0 Failed | 0 Pending | 0 Skipped
PASS
coverage: 87.0% of statements
ok github.com/nabbar/golib/ioutils/aggregator 30.005s
coverage: 85.7% of statements
ok github.com/nabbar/golib/ioutils/aggregator 32.495s
```
---
@@ -270,11 +270,11 @@ ok github.com/nabbar/golib/ioutils/aggregator 30.005s
| Component | File | Coverage | Critical Paths |
|-----------|------|----------|----------------|
| **Interface** | interface.go | 95.8% | New(), error definitions |
| **Core Logic** | model.go | 96.3% | run(), metrics tracking |
| **Writer** | writer.go | 82.4% | Write(), channel management |
| **Runner** | runner.go | 89.5% | Start(), Stop(), lifecycle |
| **Context** | context.go | 66.7% | Context interface impl |
| **Interface** | interface.go | 94.1% | New(), error definitions |
| **Core Logic** | model.go | 94.8% | run(), metrics tracking |
| **Writer** | writer.go | 80.5% | Write(), channel management |
| **Runner** | runner.go | 87.8% | Start(), Stop(), lifecycle |
| **Context** | context.go | 64.9% | Context interface impl |
| **Config** | config.go | 100% | Validation |
| **Logger** | logger.go | 100% | Error logging |
@@ -300,7 +300,7 @@ Uptime() 100.0% - Duration tracking
### Uncovered Code Analysis
**Uncovered Lines: 13.0% (target: <20%)**
**Uncovered Lines: 14.3% (target: <20%)**
#### 1. Context Interface Implementation (context.go)

View File

@@ -108,7 +108,11 @@ func (o *agg) Value(key any) any {
// cancel function for later use. If there was a previous context, its cancel
// function is called to prevent resource leaks.
func (o *agg) ctxNew(ctx context.Context) {
defer runner.RecoveryCaller("golib/ioutils/aggregator/ctxnew", recover())
defer func() {
if r := recover(); r != nil {
runner.RecoveryCaller("golib/ioutils/aggregator/ctxnew", r)
}
}()
if ctx == nil || ctx.Err() != nil {
ctx = context.Background()
@@ -129,7 +133,11 @@ func (o *agg) ctxNew(ctx context.Context) {
//
// The method is safe to call multiple times.
func (o *agg) ctxClose() {
defer runner.RecoveryCaller("golib/ioutils/aggregator/ctxclose", recover())
defer func() {
if r := recover(); r != nil {
runner.RecoveryCaller("golib/ioutils/aggregator/ctxclose", r)
}
}()
// Cancel old context first and clear it atomically
old := o.n.Swap(func() {})

View File

@@ -26,12 +26,14 @@
package aggregator
// logError calls the configured error logger if set, otherwise does nothing.
func (o *agg) logError(msg string, err ...error) {
if i := o.le.Load(); i == nil {
func (o *agg) logError(msg string, err error) {
i := o.le.Load()
if i == nil || err == nil {
return
} else {
i(msg, err...)
}
i(msg, err)
}
// logInfo calls the configured info logger if set, otherwise does nothing.

View File

@@ -178,13 +178,21 @@ func (o *agg) SizeProcessing() int64 {
// Returns:
// - error: ErrStillRunning if already running, or the context error on cancellation
func (o *agg) run(ctx context.Context) error {
defer runner.RecoveryCaller("golib/ioutils/aggregator/run", recover())
defer func() {
if r := recover(); r != nil {
runner.RecoveryCaller("golib/ioutils/aggregator/run", r)
}
}()
var (
sem libsem.Semaphore
tckAsc = time.NewTicker(o.at)
tckSnc = time.NewTicker(o.st)
fctWrt func(p []byte) error
fctSyn func()
fctAsyn func(sem libsem.Semaphore)
)
defer func() {
@@ -199,6 +207,11 @@ func (o *agg) run(ctx context.Context) error {
tckAsc.Stop()
}()
// Check if function write is set
if o.fw == nil {
return ErrInvalidInstance
}
// Check if already running - prevent multi-start
if o.op.Load() {
return ErrStillRunning
@@ -209,18 +222,33 @@ func (o *agg) run(ctx context.Context) error {
o.chanOpen()
o.cntReset() // Reset counters on start to ensure clean state
fctWrt = func(p []byte) error {
if len(p) < 1 {
return nil
} else {
o.mw.Lock()
defer o.mw.Unlock()
_, e := o.fw(p)
return e
}
}
fctAsyn = o.callASyn()
fctSyn = o.callSyn()
sem = libsem.New(context.Background(), o.am, false)
o.logInfo("starting aggregator")
for o.Err() == nil {
select {
case <-o.Done():
return o.Err()
case <-tckAsc.C:
o.callASyn(sem)
fctAsyn(sem)
case <-tckSnc.C:
o.callSyn()
fctSyn()
case p, ok := <-o.chanData():
// Decrement counter immediately when data is received from channel
@@ -228,9 +256,9 @@ func (o *agg) run(ctx context.Context) error {
if !ok {
// Channel closed, skip this iteration
continue
} else if e := o.fctWrite(p); e != nil {
} else {
// Log write errors but continue processing
o.logError("error writing data", e)
o.logError("error writing data", fctWrt(p))
}
}
}
@@ -238,30 +266,6 @@ func (o *agg) run(ctx context.Context) error {
return o.Err()
}
// fctWrite calls the configured writer function with mutex protection.
//
// This ensures that Config.FctWriter is never called concurrently, even though
// multiple goroutines may be calling Write() simultaneously.
//
// Parameters:
// - p: Data to write
//
// Returns:
// - error: nil on success, ErrInvalidInstance if no writer configured, or writer error
func (o *agg) fctWrite(p []byte) error {
o.mw.Lock()
defer o.mw.Unlock()
if len(p) < 1 {
return nil
} else if o.fw == nil {
return ErrInvalidInstance
} else {
_, e := o.fw(p)
return e
}
}
// callASyn invokes the async callback function if configured.
//
// The function is called in a new goroutine and is limited by the semaphore
@@ -272,25 +276,32 @@ func (o *agg) fctWrite(p []byte) error {
//
// Parameters:
// - sem: Semaphore for limiting concurrent workers
func (o *agg) callASyn(sem libsem.Semaphore) {
defer runner.RecoveryCaller("golib/ioutils/aggregator/callasyn", recover())
func (o *agg) callASyn() func(sem libsem.Semaphore) {
if !o.op.Load() {
return
return func(sem libsem.Semaphore) {}
} else if o.af == nil {
return
return func(sem libsem.Semaphore) {}
} else if o.x.Load() == nil {
return
} else if !sem.NewWorkerTry() {
// Semaphore full, skip this async call to avoid blocking
return
} else if e := sem.NewWorker(); e != nil {
o.logError("aggregator failed to start new async worker", e)
return
} else {
return func(sem libsem.Semaphore) {}
}
return func(sem libsem.Semaphore) {
if !sem.NewWorkerTry() {
// Semaphore full, skip this async call to avoid blocking
return
}
// Launch async function in new goroutine
go func() {
defer func() {
if r := recover(); r != nil {
runner.RecoveryCaller("golib/ioutils/aggregator/callasyn", r)
}
}()
defer sem.DeferWorker()
o.af(o.x.Load())
}()
}
@@ -300,17 +311,24 @@ func (o *agg) callASyn(sem libsem.Semaphore) {
//
// This function is called synchronously (blocking) on the timer tick.
// It should complete quickly to avoid delaying write processing.
func (o *agg) callSyn() {
defer runner.RecoveryCaller("golib/ioutils/aggregator/callsyn", recover())
func (o *agg) callSyn() func() {
if !o.op.Load() {
return
return func() {}
} else if o.sf == nil {
return
return func() {}
} else if o.x.Load() == nil {
return
return func() {}
}
return func() {
defer func() {
if r := recover(); r != nil {
runner.RecoveryCaller("golib/ioutils/aggregator/callsyn", r)
}
}()
o.sf(o.x.Load())
}
o.sf(o.x.Load())
}
// cntDataInc increments the processing counters when data enters the buffer.

View File

@@ -63,7 +63,11 @@ import (
// Note: Start returns immediately. The processing goroutine starts asynchronously.
// A small delay (10ms) is added to allow the goroutine to initialize before returning.
func (o *agg) Start(ctx context.Context) error {
defer runner.RecoveryCaller("golib/ioutils/aggregator/start", recover())
defer func() {
if r := recover(); r != nil {
runner.RecoveryCaller("golib/ioutils/aggregator/start", r)
}
}()
r := o.getRunner()
if r == nil {
@@ -102,7 +106,11 @@ func (o *agg) Start(ctx context.Context) error {
// log.Printf("stop failed: %v", err)
// }
func (o *agg) Stop(ctx context.Context) error {
defer runner.RecoveryCaller("golib/ioutils/aggregator/stop", recover())
defer func() {
if r := recover(); r != nil {
runner.RecoveryCaller("golib/ioutils/aggregator/stop", r)
}
}()
r := o.getRunner()
if r == nil {
@@ -138,7 +146,11 @@ func (o *agg) Stop(ctx context.Context) error {
// log.Printf("restart failed: %v", err)
// }
func (o *agg) Restart(ctx context.Context) error {
defer runner.RecoveryCaller("golib/ioutils/aggregator/restart", recover())
defer func() {
if r := recover(); r != nil {
runner.RecoveryCaller("golib/ioutils/aggregator/restart", r)
}
}()
if e := o.Stop(ctx); e != nil {
return e
@@ -171,7 +183,11 @@ func (o *agg) Restart(ctx context.Context) error {
// }
// agg.Write(data)
func (o *agg) IsRunning() bool {
defer runner.RecoveryCaller("golib/ioutils/aggregator/isrunning", recover())
defer func() {
if r := recover(); r != nil {
runner.RecoveryCaller("golib/ioutils/aggregator/isrunning", r)
}
}()
r := o.getRunner()
@@ -223,7 +239,11 @@ func (o *agg) IsRunning() bool {
// log.Printf("aggregator has been running for %v", uptime)
// }
func (o *agg) Uptime() time.Duration {
defer runner.RecoveryCaller("golib/ioutils/aggregator/uptime", recover())
defer func() {
if r := recover(); r != nil {
runner.RecoveryCaller("golib/ioutils/aggregator/uptime", r)
}
}()
r := o.getRunner()
if r == nil {

View File

@@ -56,7 +56,11 @@ func (o *agg) Close() error {
// closeRun is the internal close function called by the runner.
// It stops the aggregator, closes the context, and closes the channel.
func (o *agg) closeRun(ctx context.Context) error {
defer runner.RecoveryCaller("golib/ioutils/aggregator/close", recover())
defer func() {
if r := recover(); r != nil {
runner.RecoveryCaller("golib/ioutils/aggregator/close", r)
}
}()
var e error
@@ -101,7 +105,11 @@ func (o *agg) closeRun(ctx context.Context) error {
//
// Note: The aggregator must be started with Start() before calling Write.
func (o *agg) Write(p []byte) (n int, err error) {
defer runner.RecoveryCaller("golib/ioutils/aggregator/write", recover())
defer func() {
if r := recover(); r != nil {
runner.RecoveryCaller("golib/ioutils/aggregator/write", r)
}
}()
// Don't send empty data to channel
n = len(p)
@@ -125,8 +133,10 @@ func (o *agg) Write(p []byte) (n int, err error) {
} else {
// Increment processing counter before sending to channel
o.cntDataInc(n)
// Send to channel (may block if buffer is full)
c <- p
// using new slice to prevent reset params slice p
c <- append(make([]byte, 0, n+1), p[:n]...)
return len(p), nil
}
}