Updated stats

This commit is contained in:
Quentin Renard
2022-11-16 14:03:44 +01:00
parent ccbf133ca7
commit 67a52febf7
3 changed files with 141 additions and 186 deletions

219
stat.go
View File

@@ -38,12 +38,13 @@ type StatMetadata struct {
// StatValuer represents a stat valuer
type StatValuer interface {
Value() interface{}
Value(delta time.Duration) interface{}
}
// StatValuerOverTime represents a stat valuer over time
type StatValuerOverTime interface {
Value(delta time.Duration) interface{}
type StatValuerFunc func(d time.Duration) interface{}
func (f StatValuerFunc) Value(d time.Duration) interface{} {
return f(d)
}
// StatValue represents a stat value
@@ -104,8 +105,6 @@ func (s *Stater) Start(ctx context.Context) {
// Get value
var v interface{}
if h, ok := o.Valuer.(StatValuer); ok {
v = h.Value()
} else if h, ok := o.Valuer.(StatValuerOverTime); ok {
v = h.Value(delta)
} else {
continue
@@ -153,149 +152,81 @@ func (s *Stater) DelStats(os ...StatOptions) {
}
}
type durationStatOverTime struct {
d time.Duration
fn func(d, delta time.Duration) interface{}
m *sync.Mutex // Locks isStarted
lastBeginAt time.Time
type AtomicUint64RateStat struct {
last *uint64
v *uint64
}
func newDurationStatOverTime(fn func(d, delta time.Duration) interface{}) *durationStatOverTime {
return &durationStatOverTime{
fn: fn,
m: &sync.Mutex{},
func NewAtomicUint64RateStat(v *uint64) *AtomicUint64RateStat {
return &AtomicUint64RateStat{v: v}
}
func (s *AtomicUint64RateStat) Value(d time.Duration) interface{} {
current := atomic.LoadUint64(s.v)
defer func() { s.last = &current }()
if d <= 0 {
return 0.0
}
var last uint64
if s.last != nil {
last = *s.last
}
return float64(current-last) / d.Seconds()
}
type AtomicDurationPercentageStat struct {
d *AtomicDuration
last *time.Duration
}
func NewAtomicDurationPercentageStat(d *AtomicDuration) *AtomicDurationPercentageStat {
return &AtomicDurationPercentageStat{d: d}
}
func (s *AtomicDurationPercentageStat) Value(d time.Duration) interface{} {
current := s.d.Duration()
defer func() { s.last = &current }()
if d <= 0 {
return 0.0
}
var last time.Duration
if s.last != nil {
last = *s.last
}
return float64(current-last) / float64(d) * 100
}
type AtomicDurationAvgStat struct {
count *uint64
d *AtomicDuration
last *time.Duration
lastCount *uint64
}
func NewAtomicDurationAvgStat(d *AtomicDuration, count *uint64) *AtomicDurationAvgStat {
return &AtomicDurationAvgStat{
count: count,
d: d,
}
}
func (s *durationStatOverTime) Begin() {
s.m.Lock()
defer s.m.Unlock()
s.lastBeginAt = now()
}
func (s *durationStatOverTime) End() {
s.m.Lock()
defer s.m.Unlock()
s.d += now().Sub(s.lastBeginAt)
s.lastBeginAt = time.Time{}
}
func (s *durationStatOverTime) Value(delta time.Duration) (o interface{}) {
// Lock
s.m.Lock()
defer s.m.Unlock()
// Get current values
n := now()
d := s.d
// Recording is still in process
if !s.lastBeginAt.IsZero() {
d += n.Sub(s.lastBeginAt)
s.lastBeginAt = n
func (s *AtomicDurationAvgStat) Value(_ time.Duration) interface{} {
current := s.d.Duration()
currentCount := atomic.LoadUint64(s.count)
defer func() {
s.last = &current
s.lastCount = &currentCount
}()
var last time.Duration
var lastCount uint64
if s.last != nil {
last = *s.last
}
// Compute stat
o = s.fn(d, delta)
s.d = 0
return
}
// DurationPercentageStat is an object capable of computing the percentage of time some work is taking per second
type DurationPercentageStat struct {
*durationStatOverTime
}
// NewDurationPercentageStat creates a new duration percentage stat
func NewDurationPercentageStat() *DurationPercentageStat {
return &DurationPercentageStat{durationStatOverTime: newDurationStatOverTime(func(d, delta time.Duration) interface{} {
if delta == 0 {
return 0
}
return float64(d) / float64(delta) * 100
})}
}
type counterStatOverTime struct {
c float64
fn func(c, t float64, delta time.Duration) interface{}
m *sync.Mutex // Locks isStarted
t float64
}
func newCounterStatOverTime(fn func(c, t float64, delta time.Duration) interface{}) *counterStatOverTime {
return &counterStatOverTime{
fn: fn,
m: &sync.Mutex{},
if s.lastCount != nil {
lastCount = *s.lastCount
}
}
func (s *counterStatOverTime) Add(delta float64) {
s.m.Lock()
defer s.m.Unlock()
s.c += delta
s.t++
}
func (s *counterStatOverTime) Value(delta time.Duration) interface{} {
s.m.Lock()
defer s.m.Unlock()
c := s.c
t := s.t
s.c = 0
s.t = 0
return s.fn(c, t, delta)
}
// CounterAvgStat is an object capable of computing the average value of a counter
type CounterAvgStat struct {
*counterStatOverTime
}
// NewCounterAvgStat creates a new counter avg stat
func NewCounterAvgStat() *CounterAvgStat {
return &CounterAvgStat{counterStatOverTime: newCounterStatOverTime(func(c, t float64, delta time.Duration) interface{} {
if t == 0 {
return 0
}
return c / t
})}
}
// CounterRateStat is an object capable of computing the average value of a counter per second
type CounterRateStat struct {
*counterStatOverTime
}
// NewCounterRateStat creates a new counter rate stat
func NewCounterRateStat() *CounterRateStat {
return &CounterRateStat{counterStatOverTime: newCounterStatOverTime(func(c, t float64, delta time.Duration) interface{} {
if delta.Seconds() == 0 {
return 0
}
return c / delta.Seconds()
})}
}
// CounterStat is an object capable of computing a counter that never gets reset
type CounterStat struct {
c float64
m *sync.Mutex
}
// NewCounterStat creates a new counter stat
func NewCounterStat() *CounterStat {
return &CounterStat{m: &sync.Mutex{}}
}
func (s *CounterStat) Add(delta float64) {
s.m.Lock()
defer s.m.Unlock()
s.c += delta
}
func (s *CounterStat) Value() interface{} {
s.m.Lock()
defer s.m.Unlock()
return s.c
if currentCount-lastCount <= 0 {
return time.Duration(0)
}
return time.Duration(float64(current-last) / float64(currentCount-lastCount))
}

View File

@@ -4,6 +4,7 @@ import (
"context"
"reflect"
"sync"
"sync/atomic"
"testing"
"time"
)
@@ -23,17 +24,19 @@ func TestStater(t *testing.T) {
}
// Add stats
v1 := NewCounterRateStat()
var u1 uint64
v1 := NewAtomicUint64RateStat(&u1)
m1 := &StatMetadata{Description: "1"}
o1 := StatOptions{Metadata: m1, Valuer: v1}
v2 := NewDurationPercentageStat()
d2 := NewAtomicDuration(0)
v2 := NewAtomicDurationPercentageStat(d2)
m2 := &StatMetadata{Description: "2"}
o2 := StatOptions{Metadata: m2, Valuer: v2}
v3 := NewCounterAvgStat()
d3 := NewAtomicDuration(0)
v3 := NewAtomicDurationAvgStat(d3, &u1)
m3 := &StatMetadata{Description: "3"}
o3 := StatOptions{Metadata: m3, Valuer: v3}
v4 := NewCounterStat()
v4.Add(1)
v4 := StatValuerFunc(func(d time.Duration) interface{} { return 42 })
m4 := &StatMetadata{Description: "4"}
o4 := StatOptions{Metadata: m4, Valuer: v4}
@@ -48,19 +51,12 @@ func TestStater(t *testing.T) {
c++
switch c {
case 1:
v1.Add(10)
mn.Lock()
nowV = time.Unix(0, 0)
mn.Unlock()
v2.Begin()
atomic.AddUint64(&u1, 10)
d2.Add(4 * time.Second)
d3.Add(10 * time.Second)
mn.Lock()
nowV = time.Unix(5, 0)
mn.Unlock()
v2.End()
v3.Add(10)
v3.Add(20)
v3.Add(30)
v4.Add(1)
case 2:
ss = stats
cancel()
@@ -73,9 +69,9 @@ func TestStater(t *testing.T) {
defer s.Stop()
for _, e := range []StatValue{
{StatMetadata: m1, Value: 2.0},
{StatMetadata: m2, Value: 100.0},
{StatMetadata: m3, Value: 20.0},
{StatMetadata: m4, Value: 2.0},
{StatMetadata: m2, Value: 80.0},
{StatMetadata: m3, Value: time.Second},
{StatMetadata: m4, Value: 42},
} {
found := false
for _, s := range ss {

76
sync.go
View File

@@ -31,15 +31,15 @@ const (
// in which adding new funcs is blocking
// Check out ChanOptions for detailed options
type Chan struct {
cancel context.CancelFunc
c *sync.Cond
ctx context.Context
fs []func()
mc *sync.Mutex // Locks ctx
mf *sync.Mutex // Locks fs
o ChanOptions
running uint32
statWorkRatio *DurationPercentageStat
cancel context.CancelFunc
c *sync.Cond
ctx context.Context
fs []func()
mc *sync.Mutex // Locks ctx
mf *sync.Mutex // Locks fs
o ChanOptions
running uint32
statWorkDuration *AtomicDuration
}
// ChanOptions are Chan options
@@ -59,10 +59,11 @@ type ChanOptions struct {
// NewChan creates a new Chan
func NewChan(o ChanOptions) *Chan {
return &Chan{
c: sync.NewCond(&sync.Mutex{}),
mc: &sync.Mutex{},
mf: &sync.Mutex{},
o: o,
c: sync.NewCond(&sync.Mutex{}),
mc: &sync.Mutex{},
mf: &sync.Mutex{},
o: o,
statWorkDuration: NewAtomicDuration(0),
}
}
@@ -126,13 +127,9 @@ func (c *Chan) Start(ctx context.Context) {
c.mf.Unlock()
// Execute func
if c.statWorkRatio != nil {
c.statWorkRatio.Begin()
}
n := time.Now()
fn()
if c.statWorkRatio != nil {
c.statWorkRatio.End()
}
c.statWorkDuration.Add(time.Since(n))
// Remove first func
c.mf.Lock()
@@ -202,11 +199,18 @@ func (c *Chan) Reset() {
c.fs = []func(){}
}
// ChanStats represents the chan stats
type ChanStats struct {
WorkDuration time.Duration
}
// Stats returns the chan stats
func (c *Chan) Stats() []StatOptions {
if c.statWorkRatio == nil {
c.statWorkRatio = NewDurationPercentageStat()
}
func (c *Chan) Stats() ChanStats {
return ChanStats{WorkDuration: c.statWorkDuration.Duration()}
}
// StatOptions returns the chan stat options
func (c *Chan) StatOptions() []StatOptions {
return []StatOptions{
{
Metadata: &StatMetadata{
@@ -215,7 +219,7 @@ func (c *Chan) Stats() []StatOptions {
Name: StatNameWorkRatio,
Unit: "%",
},
Valuer: c.statWorkRatio,
Valuer: NewAtomicDurationPercentageStat(c.statWorkDuration),
},
}
}
@@ -487,3 +491,27 @@ func (m *RWMutex) IsDeadlocked(timeout time.Duration) (bool, string) {
<-ctx.Done()
return errors.Is(ctx.Err(), context.DeadlineExceeded), m.c
}
type AtomicDuration struct {
d time.Duration
m *sync.Mutex
}
func NewAtomicDuration(d time.Duration) *AtomicDuration {
return &AtomicDuration{
d: d,
m: &sync.Mutex{},
}
}
func (d *AtomicDuration) Add(delta time.Duration) {
d.m.Lock()
defer d.m.Unlock()
d.d += delta
}
func (d *AtomicDuration) Duration() time.Duration {
d.m.Lock()
defer d.m.Unlock()
return d.d
}