mirror of
https://github.com/asticode/go-astikit.git
synced 2025-12-24 11:50:53 +08:00
548 lines
11 KiB
Go
548 lines
11 KiB
Go
package astikit
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"runtime"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
// Stat names
|
|
const (
|
|
StatNameWorkRatio = "astikit.work.ratio"
|
|
)
|
|
|
|
// Chan constants
|
|
const (
|
|
// Calling Add() only blocks if the chan has been started and the ctx
|
|
// has not been canceled
|
|
ChanAddStrategyBlockWhenStarted = "block.when.started"
|
|
// Calling Add() never blocks
|
|
ChanAddStrategyNoBlock = "no.block"
|
|
ChanOrderFIFO = "fifo"
|
|
ChanOrderFILO = "filo"
|
|
)
|
|
|
|
// Chan is an object capable of executing funcs in a specific order while controlling the conditions
|
|
// in which adding new funcs is blocking
|
|
// Check out ChanOptions for detailed options
|
|
type Chan struct {
|
|
cancel context.CancelFunc
|
|
c *sync.Cond
|
|
ctx context.Context
|
|
fs []func()
|
|
mc *sync.Mutex // Locks ctx
|
|
mf *sync.Mutex // Locks fs
|
|
o ChanOptions
|
|
running uint32
|
|
statWorkDuration *AtomicDuration
|
|
}
|
|
|
|
// ChanOptions are Chan options
|
|
type ChanOptions struct {
|
|
// Determines the conditions in which Add() blocks. See constants with pattern ChanAddStrategy*
|
|
// Default is ChanAddStrategyNoBlock
|
|
AddStrategy string
|
|
// Order in which the funcs will be processed. See constants with pattern ChanOrder*
|
|
// Default is ChanOrderFIFO
|
|
Order string
|
|
// By default the funcs not yet processed when the context is cancelled are dropped.
|
|
// If "ProcessAll" is true, ALL funcs are processed even after the context is cancelled.
|
|
// However, no funcs can be added after the context is cancelled
|
|
ProcessAll bool
|
|
}
|
|
|
|
// NewChan creates a new Chan
|
|
func NewChan(o ChanOptions) *Chan {
|
|
return &Chan{
|
|
c: sync.NewCond(&sync.Mutex{}),
|
|
mc: &sync.Mutex{},
|
|
mf: &sync.Mutex{},
|
|
o: o,
|
|
statWorkDuration: NewAtomicDuration(0),
|
|
}
|
|
}
|
|
|
|
// Start starts the chan by looping through functions in the buffer and
|
|
// executing them if any, or waiting for a new one otherwise
|
|
func (c *Chan) Start(ctx context.Context) {
|
|
// Make sure to start only once
|
|
if atomic.CompareAndSwapUint32(&c.running, 0, 1) {
|
|
// Update status
|
|
defer atomic.StoreUint32(&c.running, 0)
|
|
|
|
// Create context
|
|
c.mc.Lock()
|
|
c.ctx, c.cancel = context.WithCancel(ctx)
|
|
d := c.ctx.Done()
|
|
c.mc.Unlock()
|
|
|
|
// Handle context
|
|
go func() {
|
|
// Wait for context to be done
|
|
<-d
|
|
|
|
// Signal
|
|
c.c.L.Lock()
|
|
c.c.Signal()
|
|
c.c.L.Unlock()
|
|
}()
|
|
|
|
// Loop
|
|
for {
|
|
// Lock cond here in case a func is added between retrieving l and doing the if on it
|
|
c.c.L.Lock()
|
|
|
|
// Get number of funcs in buffer
|
|
c.mf.Lock()
|
|
l := len(c.fs)
|
|
c.mf.Unlock()
|
|
|
|
// Only return if context has been cancelled and:
|
|
// - the user wants to drop funcs that has not yet been processed
|
|
// - the buffer is empty otherwise
|
|
c.mc.Lock()
|
|
if c.ctx.Err() != nil && (!c.o.ProcessAll || l == 0) {
|
|
c.mc.Unlock()
|
|
c.c.L.Unlock()
|
|
return
|
|
}
|
|
c.mc.Unlock()
|
|
|
|
// No funcs in buffer
|
|
if l == 0 {
|
|
c.c.Wait()
|
|
c.c.L.Unlock()
|
|
continue
|
|
}
|
|
c.c.L.Unlock()
|
|
|
|
// Get first func
|
|
c.mf.Lock()
|
|
fn := c.fs[0]
|
|
c.mf.Unlock()
|
|
|
|
// Execute func
|
|
n := time.Now()
|
|
fn()
|
|
c.statWorkDuration.Add(time.Since(n))
|
|
|
|
// Remove first func
|
|
c.mf.Lock()
|
|
c.fs = c.fs[1:]
|
|
c.mf.Unlock()
|
|
}
|
|
}
|
|
}
|
|
|
|
// Stop stops the chan
|
|
func (c *Chan) Stop() {
|
|
c.mc.Lock()
|
|
if c.cancel != nil {
|
|
c.cancel()
|
|
}
|
|
c.mc.Unlock()
|
|
}
|
|
|
|
// Add adds a new item to the chan
|
|
func (c *Chan) Add(i func()) {
|
|
// Check context
|
|
c.mc.Lock()
|
|
if c.ctx != nil && c.ctx.Err() != nil {
|
|
c.mc.Unlock()
|
|
return
|
|
}
|
|
c.mc.Unlock()
|
|
|
|
// Wrap the function
|
|
var fn func()
|
|
var wg *sync.WaitGroup
|
|
if c.o.AddStrategy == ChanAddStrategyBlockWhenStarted {
|
|
wg = &sync.WaitGroup{}
|
|
wg.Add(1)
|
|
fn = func() {
|
|
defer wg.Done()
|
|
i()
|
|
}
|
|
} else {
|
|
fn = i
|
|
}
|
|
|
|
// Add func to buffer
|
|
c.mf.Lock()
|
|
if c.o.Order == ChanOrderFILO {
|
|
c.fs = append([]func(){fn}, c.fs...)
|
|
} else {
|
|
c.fs = append(c.fs, fn)
|
|
}
|
|
c.mf.Unlock()
|
|
|
|
// Signal
|
|
c.c.L.Lock()
|
|
c.c.Signal()
|
|
c.c.L.Unlock()
|
|
|
|
// Wait
|
|
if wg != nil {
|
|
wg.Wait()
|
|
}
|
|
}
|
|
|
|
// Reset resets the chan
|
|
func (c *Chan) Reset() {
|
|
c.mf.Lock()
|
|
defer c.mf.Unlock()
|
|
c.fs = []func(){}
|
|
}
|
|
|
|
// ChanStats represents the chan stats
|
|
type ChanStats struct {
|
|
WorkDuration time.Duration
|
|
}
|
|
|
|
// Stats returns the chan stats
|
|
func (c *Chan) Stats() ChanStats {
|
|
return ChanStats{WorkDuration: c.statWorkDuration.Duration()}
|
|
}
|
|
|
|
// StatOptions returns the chan stat options
|
|
func (c *Chan) StatOptions() []StatOptions {
|
|
return []StatOptions{
|
|
{
|
|
Metadata: &StatMetadata{
|
|
Description: "Percentage of time doing work",
|
|
Label: "Work ratio",
|
|
Name: StatNameWorkRatio,
|
|
Unit: "%",
|
|
},
|
|
Valuer: NewAtomicDurationPercentageStat(c.statWorkDuration),
|
|
},
|
|
}
|
|
}
|
|
|
|
// BufferPool represents a *bytes.Buffer pool
|
|
type BufferPool struct {
|
|
bp *sync.Pool
|
|
}
|
|
|
|
// NewBufferPool creates a new BufferPool
|
|
func NewBufferPool() *BufferPool {
|
|
return &BufferPool{bp: &sync.Pool{New: func() interface{} { return &bytes.Buffer{} }}}
|
|
}
|
|
|
|
// New creates a new BufferPoolItem
|
|
func (p *BufferPool) New() *BufferPoolItem {
|
|
return newBufferPoolItem(p.bp.Get().(*bytes.Buffer), p.bp)
|
|
}
|
|
|
|
// BufferPoolItem represents a BufferPool item
|
|
type BufferPoolItem struct {
|
|
*bytes.Buffer
|
|
bp *sync.Pool
|
|
}
|
|
|
|
func newBufferPoolItem(b *bytes.Buffer, bp *sync.Pool) *BufferPoolItem {
|
|
return &BufferPoolItem{
|
|
Buffer: b,
|
|
bp: bp,
|
|
}
|
|
}
|
|
|
|
// Close implements the io.Closer interface
|
|
func (i *BufferPoolItem) Close() error {
|
|
i.Reset()
|
|
i.bp.Put(i.Buffer)
|
|
return nil
|
|
}
|
|
|
|
// GoroutineLimiter is an object capable of doing several things in parallel while maintaining the
|
|
// max number of things running in parallel under a threshold
|
|
type GoroutineLimiter struct {
|
|
busy int
|
|
c *sync.Cond
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
o GoroutineLimiterOptions
|
|
}
|
|
|
|
// GoroutineLimiterOptions represents GoroutineLimiter options
|
|
type GoroutineLimiterOptions struct {
|
|
Max int
|
|
}
|
|
|
|
// NewGoroutineLimiter creates a new GoroutineLimiter
|
|
func NewGoroutineLimiter(o GoroutineLimiterOptions) (l *GoroutineLimiter) {
|
|
l = &GoroutineLimiter{
|
|
c: sync.NewCond(&sync.Mutex{}),
|
|
o: o,
|
|
}
|
|
if l.o.Max <= 0 {
|
|
l.o.Max = 1
|
|
}
|
|
l.ctx, l.cancel = context.WithCancel(context.Background())
|
|
go l.handleCtx()
|
|
return
|
|
}
|
|
|
|
// Close closes the limiter properly
|
|
func (l *GoroutineLimiter) Close() error {
|
|
l.cancel()
|
|
return nil
|
|
}
|
|
|
|
func (l *GoroutineLimiter) handleCtx() {
|
|
<-l.ctx.Done()
|
|
l.c.L.Lock()
|
|
l.c.Broadcast()
|
|
l.c.L.Unlock()
|
|
}
|
|
|
|
// GoroutineLimiterFunc is a GoroutineLimiter func
|
|
type GoroutineLimiterFunc func()
|
|
|
|
// Do executes custom work in a goroutine
|
|
func (l *GoroutineLimiter) Do(fn GoroutineLimiterFunc) (err error) {
|
|
// Check context in case the limiter has already been closed
|
|
if err = l.ctx.Err(); err != nil {
|
|
return
|
|
}
|
|
|
|
// Lock
|
|
l.c.L.Lock()
|
|
|
|
// Wait for a goroutine to be available
|
|
for l.busy >= l.o.Max {
|
|
l.c.Wait()
|
|
}
|
|
|
|
// Check context in case the limiter has been closed while waiting
|
|
if err = l.ctx.Err(); err != nil {
|
|
return
|
|
}
|
|
|
|
// Increment
|
|
l.busy++
|
|
|
|
// Unlock
|
|
l.c.L.Unlock()
|
|
|
|
// Execute in a goroutine
|
|
go func() {
|
|
// Decrement
|
|
defer func() {
|
|
l.c.L.Lock()
|
|
l.busy--
|
|
l.c.Signal()
|
|
l.c.L.Unlock()
|
|
}()
|
|
|
|
// Execute
|
|
fn()
|
|
}()
|
|
return
|
|
}
|
|
|
|
// Eventer represents an object that can dispatch simple events (name + payload)
|
|
type Eventer struct {
|
|
c *Chan
|
|
hs map[string][]EventerHandler
|
|
mh *sync.Mutex
|
|
}
|
|
|
|
// EventerOptions represents Eventer options
|
|
type EventerOptions struct {
|
|
Chan ChanOptions
|
|
}
|
|
|
|
// EventerHandler represents a function that can handle the payload of an event
|
|
type EventerHandler func(payload interface{})
|
|
|
|
// NewEventer creates a new eventer
|
|
func NewEventer(o EventerOptions) *Eventer {
|
|
return &Eventer{
|
|
c: NewChan(o.Chan),
|
|
hs: make(map[string][]EventerHandler),
|
|
mh: &sync.Mutex{},
|
|
}
|
|
}
|
|
|
|
// On adds an handler for a specific name
|
|
func (e *Eventer) On(name string, h EventerHandler) {
|
|
// Lock
|
|
e.mh.Lock()
|
|
defer e.mh.Unlock()
|
|
|
|
// Add handler
|
|
e.hs[name] = append(e.hs[name], h)
|
|
}
|
|
|
|
// Dispatch dispatches a payload for a specific name
|
|
func (e *Eventer) Dispatch(name string, payload interface{}) {
|
|
// Lock
|
|
e.mh.Lock()
|
|
defer e.mh.Unlock()
|
|
|
|
// No handlers
|
|
hs, ok := e.hs[name]
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
// Loop through handlers
|
|
for _, h := range hs {
|
|
func(h EventerHandler) {
|
|
// Add to chan
|
|
e.c.Add(func() {
|
|
h(payload)
|
|
})
|
|
}(h)
|
|
}
|
|
}
|
|
|
|
// Start starts the eventer. It is blocking
|
|
func (e *Eventer) Start(ctx context.Context) {
|
|
e.c.Start(ctx)
|
|
}
|
|
|
|
// Stop stops the eventer
|
|
func (e *Eventer) Stop() {
|
|
e.c.Stop()
|
|
}
|
|
|
|
// Reset resets the eventer
|
|
func (e *Eventer) Reset() {
|
|
e.c.Reset()
|
|
}
|
|
|
|
// DebugMutex represents a rwmutex capable of logging its actions to ease deadlock debugging
|
|
type DebugMutex struct {
|
|
l CompleteLogger
|
|
lastCaller string
|
|
ll LoggerLevel
|
|
m *sync.RWMutex
|
|
name string
|
|
timeout time.Duration
|
|
}
|
|
|
|
// DebugMutexOpt represents a debug mutex option
|
|
type DebugMutexOpt func(m *DebugMutex)
|
|
|
|
// DebugMutexWithLockLogging allows logging all mutex locks
|
|
func DebugMutexWithLockLogging(ll LoggerLevel) DebugMutexOpt {
|
|
return func(m *DebugMutex) {
|
|
m.ll = ll
|
|
}
|
|
}
|
|
|
|
// DebugMutexWithDeadlockDetection allows detecting deadlock for all mutex locks
|
|
func DebugMutexWithDeadlockDetection(timeout time.Duration) DebugMutexOpt {
|
|
return func(m *DebugMutex) {
|
|
m.timeout = timeout
|
|
}
|
|
}
|
|
|
|
// NewDebugMutex creates a new debug mutex
|
|
func NewDebugMutex(name string, l StdLogger, opts ...DebugMutexOpt) *DebugMutex {
|
|
m := &DebugMutex{
|
|
l: AdaptStdLogger(l),
|
|
ll: LoggerLevelDebug - 1,
|
|
m: &sync.RWMutex{},
|
|
name: name,
|
|
}
|
|
for _, opt := range opts {
|
|
opt(m)
|
|
}
|
|
return m
|
|
}
|
|
|
|
func (m *DebugMutex) caller() (o string) {
|
|
if _, file, line, ok := runtime.Caller(2); ok {
|
|
o = fmt.Sprintf("%s:%d", file, line)
|
|
}
|
|
return
|
|
}
|
|
|
|
func (m *DebugMutex) log(fmt string, args ...interface{}) {
|
|
if m.ll < LoggerLevelDebug {
|
|
return
|
|
}
|
|
m.l.Writef(m.ll, fmt, args...)
|
|
}
|
|
|
|
func (m *DebugMutex) watchTimeout(caller string, fn func()) {
|
|
if m.timeout <= 0 {
|
|
fn()
|
|
return
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), m.timeout)
|
|
defer cancel()
|
|
|
|
go func() {
|
|
<-ctx.Done()
|
|
if err := ctx.Err(); err != nil && errors.Is(err, context.DeadlineExceeded) {
|
|
m.l.Errorf("astikit: %s mutex timed out at %s with last caller at %s", m.name, caller, m.lastCaller)
|
|
}
|
|
}()
|
|
|
|
fn()
|
|
}
|
|
|
|
// Lock write locks the mutex
|
|
func (m *DebugMutex) Lock() {
|
|
c := m.caller()
|
|
m.log("astikit: requesting lock for %s at %s", m.name, c)
|
|
m.watchTimeout(c, m.m.Lock)
|
|
m.log("astikit: lock acquired for %s at %s", m.name, c)
|
|
m.lastCaller = c
|
|
}
|
|
|
|
// Unlock write unlocks the mutex
|
|
func (m *DebugMutex) Unlock() {
|
|
m.m.Unlock()
|
|
m.log("astikit: unlock executed for %s", m.name)
|
|
}
|
|
|
|
// RLock read locks the mutex
|
|
func (m *DebugMutex) RLock() {
|
|
c := m.caller()
|
|
m.log("astikit: requesting rlock for %s at %s", m.name, c)
|
|
m.watchTimeout(c, m.m.RLock)
|
|
m.log("astikit: rlock acquired for %s at %s", m.name, c)
|
|
m.lastCaller = c
|
|
}
|
|
|
|
// RUnlock read unlocks the mutex
|
|
func (m *DebugMutex) RUnlock() {
|
|
m.m.RUnlock()
|
|
m.log("astikit: unlock executed for %s", m.name)
|
|
}
|
|
|
|
type AtomicDuration struct {
|
|
d time.Duration
|
|
m *sync.Mutex
|
|
}
|
|
|
|
func NewAtomicDuration(d time.Duration) *AtomicDuration {
|
|
return &AtomicDuration{
|
|
d: d,
|
|
m: &sync.Mutex{},
|
|
}
|
|
}
|
|
|
|
func (d *AtomicDuration) Add(delta time.Duration) {
|
|
d.m.Lock()
|
|
defer d.m.Unlock()
|
|
d.d += delta
|
|
}
|
|
|
|
func (d *AtomicDuration) Duration() time.Duration {
|
|
d.m.Lock()
|
|
defer d.m.Unlock()
|
|
return d.d
|
|
}
|