Files
golib/ioutils/aggregator/model.go
nabbar 8b9075280d Package IOutils/aggregator:
- FIX: bugs with pipeline message
- OPTIMZE: optimize calling function to reduce time to call custom
  function
2025-12-17 09:09:55 +01:00

382 lines
10 KiB
Go

/*
* MIT License
*
* Copyright (c) 2025 Nicolas JUHEL
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*/
package aggregator
import (
"context"
"sync"
"sync/atomic"
"time"
libatm "github.com/nabbar/golib/atomic"
"github.com/nabbar/golib/runner"
librun "github.com/nabbar/golib/runner/startStop"
libsem "github.com/nabbar/golib/semaphore"
)
// agg is the internal implementation of the Aggregator interface.
//
// It uses atomic values for thread-safe access to shared state and a mutex
// to protect the writer function from concurrent calls.
//
// Fields:
// - x: Internal context for cancellation propagation
// - n: Context cancel function
// - l: Logger instance
// - r: Runner for lifecycle management
// - at: Async function timer interval
// - am: Max concurrent async functions (-1 = unlimited)
// - af: Async callback function
// - st: Sync function timer interval
// - sf: Sync callback function
// - mw: Mutex protecting fw from concurrent calls
// - fw: Writer function (Config.FctWriter)
// - sh: Channel buffer size
// - ch: Buffered channel for write operations
// - op: Atomic boolean indicating if channel is open
type agg struct {
x libatm.Value[context.Context] // context control
n libatm.Value[context.CancelFunc] // running control
r libatm.Value[librun.StartStop] // runner instance
le libatm.Value[func(msg string, err ...error)] // logger instance
li libatm.Value[func(msg string, arg ...any)] // logger instance
at time.Duration // ticker duration of asynchronous function
am int // maximum asynchronous call in same time
af func(ctx context.Context) // asynchronous function
st time.Duration // ticker duration of synchronous function
sf func(ctx context.Context) // synchronous function
mw sync.Mutex // mutex single call of fw
fw func(p []byte) (n int, err error) // main function write
sh int // size of buffered channel data
ch libatm.Value[chan []byte] // channel data
op *atomic.Bool // channel is closing
cd *atomic.Int64 // counter of message in buffered channel
cw *atomic.Int64 // counter of waiting write to buffered channel
sd *atomic.Int64 // size of message in buffered channel
sw *atomic.Int64 // size of waiting write to buffered channel
}
// SetLoggerError sets a custom error logging function for the aggregator.
//
// The provided function will be called whenever an error occurs during internal
// operations, such as write failures or context cancellation.
//
// Parameters:
// - f: The error logging function. If nil, a no-op function is used.
//
// This method is safe for concurrent use.
//
// Example:
//
// agg.SetLoggerError(func(msg string, err ...error) {
// log.Printf("ERROR: %s: %v", msg, err)
// })
func (a *agg) SetLoggerError(f func(msg string, err ...error)) {
if f == nil {
a.le.Store(func(msg string, err ...error) {})
return
}
a.le.Store(f)
}
// SetLoggerInfo sets a custom info logging function for the aggregator.
//
// The provided function will be called for informational messages during
// normal operations, such as start/stop events.
//
// Parameters:
// - f: The info logging function. If nil, a no-op function is used.
//
// This method is safe for concurrent use.
//
// Example:
//
// agg.SetLoggerInfo(func(msg string, arg ...any) {
// log.Printf("INFO: "+msg, arg...)
// })
func (a *agg) SetLoggerInfo(f func(msg string, arg ...any)) {
if f == nil {
a.li.Store(func(msg string, arg ...any) {})
return
}
a.li.Store(f)
}
// NbWaiting returns the number of Write() calls currently waiting to send data to the channel.
// See Aggregator.NbWaiting() for details.
func (o *agg) NbWaiting() int64 {
return o.cw.Load()
}
// SizeWaiting returns the total size in bytes of blocked Write() calls.
// See Aggregator.SizeWaiting() for details.
func (o *agg) SizeWaiting() int64 {
return o.sw.Load()
}
// NbProcessing returns the number of items buffered in the channel waiting to be processed.
// See Aggregator.NbProcessing() for details.
func (o *agg) NbProcessing() int64 {
return o.cd.Load()
}
// SizeProcessing returns the total size in bytes of buffered data items.
// See Aggregator.SizeProcessing() for details.
func (o *agg) SizeProcessing() int64 {
return o.sd.Load()
}
// run is the main processing loop that handles write operations and periodic callbacks.
//
// This function:
// 1. Checks for multi-start condition and returns ErrStillRunning if already running
// 2. Initializes the internal context and opens the write channel
// 3. Creates a semaphore for limiting concurrent async function calls
// 4. Enters the main select loop to process:
// - Context cancellation (via Done channel)
// - Async callback timer ticks
// - Sync callback timer ticks
// - Write data from the channel
//
// The function runs until the context is cancelled or an error occurs.
// All cleanup is handled in the deferred function.
//
// Parameters:
// - ctx: Parent context for the processing loop
//
// Returns:
// - error: ErrStillRunning if already running, or the context error on cancellation
func (o *agg) run(ctx context.Context) error {
defer func() {
if r := recover(); r != nil {
runner.RecoveryCaller("golib/ioutils/aggregator/run", r)
}
}()
var (
sem libsem.Semaphore
tckAsc = time.NewTicker(o.at)
tckSnc = time.NewTicker(o.st)
fctWrt func(p []byte) error
fctSyn func()
fctAsyn func(sem libsem.Semaphore)
)
defer func() {
// Cleanup: release semaphore, close aggregator, stop timers
if sem != nil {
sem.DeferMain()
}
_ = o.Close()
o.logInfo("stopping aggregator")
tckSnc.Stop()
tckAsc.Stop()
}()
// Check if function write is set
if o.fw == nil {
return ErrInvalidInstance
}
// Check if already running - prevent multi-start
if o.op.Load() {
return ErrStillRunning
}
// Initialize context and open channel (which sets op to true)
o.ctxNew(ctx)
o.chanOpen()
o.cntReset() // Reset counters on start to ensure clean state
fctWrt = func(p []byte) error {
if len(p) < 1 {
return nil
} else {
o.mw.Lock()
defer o.mw.Unlock()
_, e := o.fw(p)
return e
}
}
fctAsyn = o.callASyn()
fctSyn = o.callSyn()
sem = libsem.New(context.Background(), o.am, false)
o.logInfo("starting aggregator")
for o.Err() == nil {
select {
case <-o.Done():
return o.Err()
case <-tckAsc.C:
fctAsyn(sem)
case <-tckSnc.C:
fctSyn()
case p, ok := <-o.chanData():
// Decrement counter immediately when data is received from channel
o.cntDataDec(len(p))
if !ok {
// Channel closed, skip this iteration
continue
} else {
// Log write errors but continue processing
o.logError("error writing data", fctWrt(p))
}
}
}
return o.Err()
}
// callASyn invokes the async callback function if configured.
//
// The function is called in a new goroutine and is limited by the semaphore
// to prevent too many concurrent async calls (respecting Config.AsyncMax).
//
// If the semaphore is full (max workers reached), the call is skipped.
// This prevents blocking the main processing loop.
//
// Parameters:
// - sem: Semaphore for limiting concurrent workers
func (o *agg) callASyn() func(sem libsem.Semaphore) {
if !o.op.Load() {
return func(sem libsem.Semaphore) {}
} else if o.af == nil {
return func(sem libsem.Semaphore) {}
} else if o.x.Load() == nil {
return func(sem libsem.Semaphore) {}
}
return func(sem libsem.Semaphore) {
if !sem.NewWorkerTry() {
// Semaphore full, skip this async call to avoid blocking
return
}
// Launch async function in new goroutine
go func() {
defer func() {
if r := recover(); r != nil {
runner.RecoveryCaller("golib/ioutils/aggregator/callasyn", r)
}
}()
defer sem.DeferWorker()
o.af(o.x.Load())
}()
}
}
// callSyn invokes the sync callback function if configured.
//
// This function is called synchronously (blocking) on the timer tick.
// It should complete quickly to avoid delaying write processing.
func (o *agg) callSyn() func() {
if !o.op.Load() {
return func() {}
} else if o.sf == nil {
return func() {}
} else if o.x.Load() == nil {
return func() {}
}
return func() {
defer func() {
if r := recover(); r != nil {
runner.RecoveryCaller("golib/ioutils/aggregator/callsyn", r)
}
}()
o.sf(o.x.Load())
}
}
// cntDataInc increments the processing counters when data enters the buffer.
// It tracks both the number of items and total bytes in the channel.
func (o *agg) cntDataInc(i int) {
o.cd.Add(1)
o.sd.Add(int64(i))
}
// cntDataDec decrements the processing counters when data is consumed from the buffer.
// It ensures counters never go negative by resetting to 0 if needed.
func (o *agg) cntDataDec(i int) {
o.cd.Add(-1)
if j := o.cd.Load(); j < 0 {
o.cd.Store(0)
}
o.sd.Add(int64(-i))
if j := o.sd.Load(); j < 0 {
o.sd.Store(0)
}
}
// cntWaitInc increments the waiting counters when a Write() call blocks.
// It tracks both the number of blocked writes and total bytes waiting.
func (o *agg) cntWaitInc(i int) {
o.cw.Add(1)
o.sw.Add(int64(i))
}
// cntWaitDec decrements the waiting counters when a blocked Write() proceeds.
// It ensures counters never go negative by resetting to 0 if needed.
func (o *agg) cntWaitDec(i int) {
o.cw.Add(-1)
if j := o.cw.Load(); j < 0 {
o.cw.Store(0)
}
o.sw.Add(int64(-i))
if j := o.sw.Load(); j < 0 {
o.sw.Store(0)
}
}
// cntReset resets all counters to zero.
// Called when the aggregator starts to ensure clean state.
func (o *agg) cntReset() {
o.cd.Store(0)
o.sd.Store(0)
o.cw.Store(0)
o.sw.Store(0)
}