diff --git a/cluster/node/cache.go b/cluster/node/cache.go index bce3db5c..24041f23 100644 --- a/cluster/node/cache.go +++ b/cluster/node/cache.go @@ -4,38 +4,30 @@ import ( "errors" "sync" "time" + + timesrc "github.com/datarhei/core/v16/time" ) -type TimeSource interface { - Now() time.Time -} - -type StdTimeSource struct{} - -func (s *StdTimeSource) Now() time.Time { - return time.Now() -} - type CacheEntry[T any] struct { value T validUntil time.Time } type Cache[T any] struct { - ts TimeSource + ts timesrc.Source lock sync.Mutex entries map[string]CacheEntry[T] lastPurge time.Time } -func NewCache[T any](ts TimeSource) *Cache[T] { +func NewCache[T any](ts timesrc.Source) *Cache[T] { c := &Cache[T]{ ts: ts, entries: map[string]CacheEntry[T]{}, } if c.ts == nil { - c.ts = &StdTimeSource{} + c.ts = ×rc.StdSource{} } c.lastPurge = c.ts.Now() diff --git a/cluster/node/cache_test.go b/cluster/node/cache_test.go index 2c8d7a8f..444f60d8 100644 --- a/cluster/node/cache_test.go +++ b/cluster/node/cache_test.go @@ -4,20 +4,14 @@ import ( "testing" "time" + timesrc "github.com/datarhei/core/v16/time" + "github.com/stretchr/testify/require" ) -type testTimeSource struct { - now time.Time -} - -func (t *testTimeSource) Now() time.Time { - return t.now -} - func TestCache(t *testing.T) { - ts := &testTimeSource{ - now: time.Unix(0, 0), + ts := ×rc.TestSource{ + N: time.Unix(0, 0), } c := NewCache[string](ts) @@ -31,21 +25,21 @@ func TestCache(t *testing.T) { require.NoError(t, err) require.Equal(t, "bar", v) - ts.now = time.Unix(10, 0) + ts.Set(10, 0) v, err = c.Get("foo") require.NoError(t, err) require.Equal(t, "bar", v) - ts.now = time.Unix(11, 0) + ts.Set(11, 0) _, err = c.Get("foo") require.Error(t, err) } func TestCachePurge(t *testing.T) { - ts := &testTimeSource{ - now: time.Unix(0, 0), + ts := ×rc.TestSource{ + N: time.Unix(0, 0), } c := NewCache[string](ts) @@ -56,14 +50,14 @@ func TestCachePurge(t *testing.T) { require.NoError(t, err) require.Equal(t, "bar", v) - ts.now = time.Unix(59, 0) + ts.Set(59, 0) c.Put("foz", "boz", 10*time.Second) _, ok := c.entries["foo"] require.True(t, ok) - ts.now = time.Unix(61, 0) + ts.Set(61, 0) c.Put("foz", "boz", 10*time.Second) diff --git a/ffmpeg/parse/average.go b/ffmpeg/parse/average.go index 8f715ddd..3d633594 100644 --- a/ffmpeg/parse/average.go +++ b/ffmpeg/parse/average.go @@ -3,23 +3,23 @@ package parse import ( "time" - "github.com/prep/average" + "github.com/datarhei/core/v16/math/average" ) type averager struct { - fps *average.SlidingWindow - pps *average.SlidingWindow - bitrate *average.SlidingWindow + fps *average.SMA + pps *average.SMA + bitrate *average.SMA } func (a *averager) init(window, granularity time.Duration) { - a.fps, _ = average.New(window, granularity) - a.pps, _ = average.New(window, granularity) - a.bitrate, _ = average.New(window, granularity) + a.fps, _ = average.NewSMA(window, granularity) + a.pps, _ = average.NewSMA(window, granularity) + a.bitrate, _ = average.NewSMA(window, granularity) } func (a *averager) stop() { - a.fps.Stop() - a.pps.Stop() - a.bitrate.Stop() + a.fps.Reset() + a.pps.Reset() + a.bitrate.Reset() } diff --git a/ffmpeg/parse/parser.go b/ffmpeg/parse/parser.go index 125e3ee1..b4912af1 100644 --- a/ffmpeg/parse/parser.go +++ b/ffmpeg/parse/parser.go @@ -410,23 +410,15 @@ func (p *parser) Parse(line []byte) uint64 { } } - p.averager.main.fps.Add(int64(p.stats.main.diff.frame)) - p.averager.main.pps.Add(int64(p.stats.main.diff.packet)) - p.averager.main.bitrate.Add(int64(p.stats.main.diff.size) * 8) - - p.progress.ffmpeg.FPS = p.averager.main.fps.Average(p.averager.window) - p.progress.ffmpeg.PPS = p.averager.main.pps.Average(p.averager.window) - p.progress.ffmpeg.Bitrate = p.averager.main.bitrate.Average(p.averager.window) + p.progress.ffmpeg.FPS = p.averager.main.fps.AddAndAverage(float64(p.stats.main.diff.frame)) + p.progress.ffmpeg.PPS = p.averager.main.pps.AddAndAverage(float64(p.stats.main.diff.packet)) + p.progress.ffmpeg.Bitrate = p.averager.main.bitrate.AddAndAverage(float64(p.stats.main.diff.size) * 8) if len(p.averager.input) != 0 && len(p.averager.input) == len(p.progress.ffmpeg.Input) { for i := range p.progress.ffmpeg.Input { - p.averager.input[i].fps.Add(int64(p.stats.input[i].diff.frame)) - p.averager.input[i].pps.Add(int64(p.stats.input[i].diff.packet)) - p.averager.input[i].bitrate.Add(int64(p.stats.input[i].diff.size) * 8) - - p.progress.ffmpeg.Input[i].FPS = p.averager.input[i].fps.Average(p.averager.window) - p.progress.ffmpeg.Input[i].PPS = p.averager.input[i].pps.Average(p.averager.window) - p.progress.ffmpeg.Input[i].Bitrate = p.averager.input[i].bitrate.Average(p.averager.window) + p.progress.ffmpeg.Input[i].FPS = p.averager.input[i].fps.AddAndAverage(float64(p.stats.input[i].diff.frame)) + p.progress.ffmpeg.Input[i].PPS = p.averager.input[i].pps.AddAndAverage(float64(p.stats.input[i].diff.packet)) + p.progress.ffmpeg.Input[i].Bitrate = p.averager.input[i].bitrate.AddAndAverage(float64(p.stats.input[i].diff.size) * 8) if p.collector.IsCollectableIP(p.process.input[i].IP) { p.collector.Activate("") @@ -437,13 +429,9 @@ func (p *parser) Parse(line []byte) uint64 { if len(p.averager.output) != 0 && len(p.averager.output) == len(p.progress.ffmpeg.Output) { for i := range p.progress.ffmpeg.Output { - p.averager.output[i].fps.Add(int64(p.stats.output[i].diff.frame)) - p.averager.output[i].pps.Add(int64(p.stats.output[i].diff.packet)) - p.averager.output[i].bitrate.Add(int64(p.stats.output[i].diff.size) * 8) - - p.progress.ffmpeg.Output[i].FPS = p.averager.output[i].fps.Average(p.averager.window) - p.progress.ffmpeg.Output[i].PPS = p.averager.output[i].pps.Average(p.averager.window) - p.progress.ffmpeg.Output[i].Bitrate = p.averager.output[i].bitrate.Average(p.averager.window) + p.progress.ffmpeg.Output[i].FPS = p.averager.output[i].fps.AddAndAverage(float64(p.stats.output[i].diff.frame)) + p.progress.ffmpeg.Output[i].PPS = p.averager.output[i].pps.AddAndAverage(float64(p.stats.output[i].diff.packet)) + p.progress.ffmpeg.Output[i].Bitrate = p.averager.output[i].bitrate.AddAndAverage(float64(p.stats.output[i].diff.size) * 8) if p.collector.IsCollectableIP(p.process.output[i].IP) { p.collector.Activate("") diff --git a/ffmpeg/parse/parser_test.go b/ffmpeg/parse/parser_test.go index 9caf348d..3e351f48 100644 --- a/ffmpeg/parse/parser_test.go +++ b/ffmpeg/parse/parser_test.go @@ -184,11 +184,11 @@ func TestParserLogHistory(t *testing.T) { require.Equal(t, Progress{ Started: true, Frame: 5968, - FPS: 0, // is calculated with averager + FPS: 5968. / 30, // is calculated with averager Quantizer: 19.4, Size: 453632, Time: d.Seconds(), - Bitrate: 0, // is calculated with averager + Bitrate: 443. * 1024 * 8 / 30, // is calculated with averager Speed: 0.999, Drop: 3522, Dup: 87463, @@ -245,11 +245,11 @@ func TestParserImportLogHistory(t *testing.T) { require.Equal(t, Progress{ Started: true, Frame: 42, - FPS: 0, // is calculated with averager + FPS: 5968. / 30, // is calculated with averager Quantizer: 19.4, Size: 453632, Time: d.Seconds(), - Bitrate: 0, // is calculated with averager + Bitrate: 443. * 1024 * 8 / 30, // is calculated with averager Speed: 0.999, Drop: 3522, Dup: 87463, @@ -312,11 +312,11 @@ func TestParserLogMinimalHistoryLength(t *testing.T) { require.Equal(t, Progress{ Started: true, Frame: 5968, - FPS: 0, // is calculated with averager + FPS: 5968. / 30, // is calculated with averager Quantizer: 19.4, Size: 453632, Time: d.Seconds(), - Bitrate: 0, // is calculated with averager + Bitrate: 443. * 1024 * 8 / 30, // is calculated with averager Speed: 0.999, Drop: 3522, Dup: 87463, @@ -330,11 +330,11 @@ func TestParserLogMinimalHistoryLength(t *testing.T) { require.Equal(t, Progress{ Started: true, Frame: 5968, - FPS: 0, // is calculated with averager + FPS: 5968. / 30, // is calculated with averager Quantizer: 19.4, Size: 453632, Time: d.Seconds(), - Bitrate: 0, // is calculated with averager + Bitrate: 443. * 1024 * 8 / 30, // is calculated with averager Speed: 0.999, Drop: 3522, Dup: 87463, @@ -884,11 +884,11 @@ func TestParserProgressPlayout(t *testing.T) { Coder: "h264", Frame: 7, Keyframe: 1, - FPS: 0, + FPS: 7. / 30, Packet: 11, - PPS: 0, + PPS: 11. / 30, Size: 42, - Bitrate: 0, + Bitrate: 42. * 8 / 30, Pixfmt: "yuvj420p", Quantizer: 0, Width: 1280, @@ -938,11 +938,11 @@ func TestParserProgressPlayout(t *testing.T) { Coder: "libx264", Frame: 7, Keyframe: 1, - FPS: 0, + FPS: 7. / 30, Packet: 0, PPS: 0, Size: 5, - Bitrate: 0, + Bitrate: 5. * 8 / 30, Extradata: 32, Pixfmt: "yuvj420p", Quantizer: 0, @@ -962,11 +962,11 @@ func TestParserProgressPlayout(t *testing.T) { Codec: "h264", Coder: "copy", Frame: 11, - FPS: 0, + FPS: 11. / 30, Packet: 11, - PPS: 0, + PPS: 11. / 30, Size: 231424, - Bitrate: 0, + Bitrate: 231424. * 8 / 30, Pixfmt: "yuvj420p", Quantizer: -1, Width: 1280, @@ -979,12 +979,12 @@ func TestParserProgressPlayout(t *testing.T) { }, Frame: 7, Packet: 0, - FPS: 0, + FPS: 7. / 30, PPS: 0, Quantizer: 0, Size: 231424, Time: 0.56, - Bitrate: 0, + Bitrate: 231424. * 8 / 30, Speed: 0.4, Drop: 0, Dup: 0, @@ -1016,11 +1016,11 @@ func TestParserProgressPlayoutVideo(t *testing.T) { Coder: "h264", Frame: 7, Keyframe: 1, - FPS: 0, + FPS: 7. / 30, Packet: 11, - PPS: 0, + PPS: 11. / 30, Size: 42, - Bitrate: 0, + Bitrate: 42. * 8 / 30, Pixfmt: "yuvj420p", Quantizer: 0, Width: 1280, @@ -1076,11 +1076,11 @@ func TestParserProgressPlayoutVideo(t *testing.T) { Coder: "libx264", Frame: 7, Keyframe: 1, - FPS: 0, + FPS: 7. / 30, Packet: 0, PPS: 0, Size: 5, - Bitrate: 0, + Bitrate: 5. * 8 / 30, Extradata: 32, Pixfmt: "yuvj420p", Quantizer: 0, @@ -1100,11 +1100,11 @@ func TestParserProgressPlayoutVideo(t *testing.T) { Codec: "h264", Coder: "copy", Frame: 11, - FPS: 0, + FPS: 11. / 30, Packet: 11, - PPS: 0, + PPS: 11. / 30, Size: 231424, - Bitrate: 0, + Bitrate: 231424. * 8 / 30, Pixfmt: "yuvj420p", Quantizer: -1, Width: 1280, @@ -1117,12 +1117,12 @@ func TestParserProgressPlayoutVideo(t *testing.T) { }, Frame: 7, Packet: 0, - FPS: 0, + FPS: 7. / 30, PPS: 0, Quantizer: 0, Size: 231424, Time: 0.56, - Bitrate: 0, + Bitrate: 231424. * 8 / 30, Speed: 0.4, Drop: 0, Dup: 0, @@ -1161,11 +1161,11 @@ func TestParserProgressPlayoutAudioVideo(t *testing.T) { Max float64 Average float64 }{25, 25, 25}, - FPS: 0, + FPS: 101. / 30, Packet: 101, - PPS: 0, + PPS: 101. / 30, Size: 530273, - Bitrate: 0, + Bitrate: 530273. * 8 / 30, Pixfmt: "yuv420p", Quantizer: 0, Width: 1280, @@ -1228,11 +1228,11 @@ func TestParserProgressPlayoutAudioVideo(t *testing.T) { Max float64 Average float64 }{43.083, 43.083, 43.083}, - FPS: 0, + FPS: 174. / 30, Packet: 174, - PPS: 0, + PPS: 174. / 30, Size: 713, - Bitrate: 0, + Bitrate: 713. * 8 / 30, Pixfmt: "", Quantizer: 0, Width: 0, @@ -1301,11 +1301,11 @@ func TestParserProgressPlayoutAudioVideo(t *testing.T) { Max float64 Average float64 }{25, 25, 25}, - FPS: 0, + FPS: 101. / 30, Packet: 101, - PPS: 0, + PPS: 101. / 30, Size: 530273, - Bitrate: 0, + Bitrate: 530273. * 8 / 30, Extradata: 0, Pixfmt: "yuv420p", Quantizer: -1, @@ -1333,11 +1333,11 @@ func TestParserProgressPlayoutAudioVideo(t *testing.T) { Max float64 Average float64 }{43.083, 43.083, 43.083}, - FPS: 0, + FPS: 174. / 30, Packet: 174, - PPS: 0, + PPS: 174. / 30, Size: 713, - Bitrate: 0, + Bitrate: 713. * 8 / 30, Pixfmt: "", Quantizer: 0, Width: 0, @@ -1351,12 +1351,12 @@ func TestParserProgressPlayoutAudioVideo(t *testing.T) { }, Frame: 101, Packet: 101, - FPS: 0, - PPS: 0, + FPS: 101. / 30, + PPS: 101. / 30, Quantizer: -1, Size: 530986, Time: 4.3, - Bitrate: 0, + Bitrate: 530986. * 8 / 30, Speed: 1, Drop: 0, Dup: 0, diff --git a/go.mod b/go.mod index 8e8ab4ff..47f22d5c 100644 --- a/go.mod +++ b/go.mod @@ -33,7 +33,6 @@ require ( github.com/lithammer/shortuuid/v4 v4.0.0 github.com/mattn/go-isatty v0.0.20 github.com/minio/minio-go/v7 v7.0.77 - github.com/prep/average v0.0.0-20200506183628-d26c465f48c3 github.com/prometheus/client_golang v1.20.4 github.com/puzpuzpuz/xsync/v3 v3.4.0 github.com/shirou/gopsutil/v3 v3.24.5 diff --git a/go.sum b/go.sum index 8ea02977..95462ea3 100644 --- a/go.sum +++ b/go.sum @@ -231,8 +231,6 @@ github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 h1:o4JXh1EVt github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= -github.com/prep/average v0.0.0-20200506183628-d26c465f48c3 h1:Y7qCvg282QmlyrVQuL2fgGwebuw7zvfnRym09r+dUGc= -github.com/prep/average v0.0.0-20200506183628-d26c465f48c3/go.mod h1:0ZE5gcyWKS151WBDIpmLshHY0l+3edpuKnBUWVVbWKk= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= github.com/prometheus/client_golang v1.4.0/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU= diff --git a/math/average/sma.go b/math/average/sma.go new file mode 100644 index 00000000..d31e0cd1 --- /dev/null +++ b/math/average/sma.go @@ -0,0 +1,118 @@ +package average + +import ( + "container/ring" + "errors" + gotime "time" + + "github.com/datarhei/core/v16/time" +) + +type SMA struct { + ts time.Source + window int64 + granularity int64 + size int + last int64 + samples *ring.Ring +} + +var ErrWindow = errors.New("window size must be positive") +var ErrGranularity = errors.New("granularity must be positive") +var ErrMultiplier = errors.New("window size has to be a multiplier of the granularity size") + +func NewSMA(window, granularity gotime.Duration) (*SMA, error) { + if window <= 0 { + return nil, ErrWindow + } + + if granularity <= 0 { + return nil, ErrGranularity + } + + if window <= granularity || window%granularity != 0 { + return nil, ErrMultiplier + } + + s := &SMA{ + ts: &time.StdSource{}, + window: window.Nanoseconds(), + granularity: granularity.Nanoseconds(), + } + + s.init() + + return s, nil +} + +func (s *SMA) init() { + s.size = int(s.window / s.granularity) + s.samples = ring.New(s.size) + + s.Reset() + + now := s.ts.Now().UnixNano() + s.last = now - now%s.granularity +} + +func (s *SMA) Add(v float64) { + now := s.ts.Now().UnixNano() + now -= now % s.granularity + + n := (now - s.last) / s.granularity + + if n >= int64(s.samples.Len()) { + // zero everything + s.Reset() + } else { + for i := n; i > 0; i-- { + s.samples = s.samples.Next() + s.samples.Value = float64(0) + } + } + + s.samples.Value = s.samples.Value.(float64) + v + + s.last = now +} + +func (s *SMA) AddAndAverage(v float64) float64 { + s.Add(v) + + total := float64(0) + + s.samples.Do(func(v any) { + total += v.(float64) + }) + + return total / float64(s.samples.Len()) +} + +func (s *SMA) Average() float64 { + total, samplecount := s.Total() + + return total / float64(samplecount) +} + +func (s *SMA) Reset() { + n := s.samples.Len() + + // Initialize the ring buffer with 0 values. + for i := 0; i < n; i++ { + s.samples.Value = float64(0) + s.samples = s.samples.Next() + } +} + +func (s *SMA) Total() (float64, int) { + // Propagate the ringbuffer + s.Add(0) + + total := float64(0) + + s.samples.Do(func(v any) { + total += v.(float64) + }) + + return total, s.samples.Len() +} diff --git a/math/average/sma_test.go b/math/average/sma_test.go new file mode 100644 index 00000000..4e650221 --- /dev/null +++ b/math/average/sma_test.go @@ -0,0 +1,301 @@ +package average + +import ( + "testing" + "time" + + timesrc "github.com/datarhei/core/v16/time" + "github.com/stretchr/testify/require" +) + +func TestNewSMA(t *testing.T) { + _, err := NewSMA(time.Second, time.Second) + require.Error(t, err) + require.ErrorIs(t, err, ErrMultiplier) + + _, err = NewSMA(time.Second, 2*time.Second) + require.Error(t, err) + require.ErrorIs(t, err, ErrMultiplier) + + _, err = NewSMA(3*time.Second, 2*time.Second) + require.Error(t, err) + require.ErrorIs(t, err, ErrMultiplier) + + _, err = NewSMA(0, time.Second) + require.Error(t, err) + require.ErrorIs(t, err, ErrWindow) + + _, err = NewSMA(time.Second, 0) + require.Error(t, err) + require.ErrorIs(t, err, ErrGranularity) + + sme, err := NewSMA(10*time.Second, time.Second) + require.NoError(t, err) + require.NotNil(t, sme) +} + +func TestAddSMA(t *testing.T) { + ts := ×rc.TestSource{ + N: time.Unix(0, 0), + } + + sme := &SMA{ + ts: ts, + window: time.Second.Nanoseconds(), + granularity: time.Millisecond.Nanoseconds(), + } + sme.init() + + sme.Add(42) + + total, samplecount := sme.Total() + require.Equal(t, float64(42), total) + require.Equal(t, int(time.Second/time.Millisecond), samplecount) + + sme.Add(5) + + total, samplecount = sme.Total() + require.Equal(t, float64(47), total) + require.Equal(t, int(time.Second/time.Millisecond), samplecount) + + ts.Set(5, 0) + + total, samplecount = sme.Total() + require.Equal(t, float64(0), total) + require.Equal(t, int(time.Second/time.Millisecond), samplecount) +} + +func TestAverageSMA(t *testing.T) { + ts := ×rc.TestSource{ + N: time.Unix(0, 0), + } + + sme := &SMA{ + ts: ts, + window: time.Second.Nanoseconds(), + granularity: time.Millisecond.Nanoseconds(), + } + sme.init() + + sme.Add(42) + + avg := sme.Average() + require.Equal(t, 42.0/1000, avg) + + sme.Add(5) + + avg = sme.Average() + require.Equal(t, 47.0/1000, avg) + + ts.Set(5, 0) + + avg = sme.Average() + require.Equal(t, .0/1000, avg) +} + +func TestAddAndAverageSMA(t *testing.T) { + ts := ×rc.TestSource{ + N: time.Unix(0, 0), + } + + sme := &SMA{ + ts: ts, + window: time.Second.Nanoseconds(), + granularity: time.Millisecond.Nanoseconds(), + } + sme.init() + + avg := sme.AddAndAverage(42) + require.Equal(t, 42.0/1000, avg) + + avg = sme.AddAndAverage(5) + require.Equal(t, 47.0/1000, avg) + + ts.Set(5, 0) + + avg = sme.Average() + require.Equal(t, .0/1000, avg) +} + +func TestAverageSeriesSMA(t *testing.T) { + ts := ×rc.TestSource{ + N: time.Unix(0, 0), + } + + sme := &SMA{ + ts: ts, + window: 10 * time.Second.Nanoseconds(), + granularity: time.Second.Nanoseconds(), + } + sme.init() + + sme.Add(42) // [42, 0, 0, 0, 0, 0, 0, 0, 0, 0] + + ts.Set(1, 0) + + sme.Add(5) // [5, 42, 0, 0, 0, 0, 0, 0, 0, 0] + + ts.Set(2, 0) + + sme.Add(18) // [18, 5, 42, 0, 0, 0, 0, 0, 0, 0] + + ts.Set(3, 0) + + sme.Add(47) // [47, 18, 5, 42, 0, 0, 0, 0, 0, 0] + + ts.Set(4, 0) + + sme.Add(92) // [92, 47, 18, 5, 42, 0, 0, 0, 0, 0] + + ts.Set(5, 0) + + sme.Add(2) // [2, 92, 47, 18, 5, 42, 0, 0, 0, 0] + + ts.Set(6, 0) + + sme.Add(75) // [75, 2, 92, 47, 18, 5, 42, 0, 0, 0] + + ts.Set(7, 0) + + sme.Add(33) // [33, 75, 2, 92, 47, 18, 5, 42, 0, 0] + + ts.Set(8, 0) + + sme.Add(89) // [89, 33, 75, 2, 92, 47, 18, 5, 42, 0] + + ts.Set(9, 0) + + sme.Add(12) // [12, 89, 33, 75, 2, 92, 47, 18, 5, 42] + + avg := sme.Average() + require.Equal(t, (12+89+33+75+2+92+47+18+5+42)/10., avg) + + ts.Set(10, 0) + + avg = sme.Average() + require.Equal(t, (12+89+33+75+2+92+47+18+5)/10., avg) + + ts.Set(15, 0) + + avg = sme.Average() + require.Equal(t, (12+89+33+75)/10., avg) + + ts.Set(19, 0) + + avg = sme.Average() + require.Equal(t, (0)/10., avg) +} + +func TestResetSMA(t *testing.T) { + ts := ×rc.TestSource{ + N: time.Unix(0, 0), + } + + sme := &SMA{ + ts: ts, + window: 10 * time.Second.Nanoseconds(), + granularity: time.Second.Nanoseconds(), + } + sme.init() + + sme.Add(42) // [42, 0, 0, 0, 0, 0, 0, 0, 0, 0] + + ts.Set(1, 0) + + sme.Add(5) // [5, 42, 0, 0, 0, 0, 0, 0, 0, 0] + + ts.Set(2, 0) + + sme.Add(18) // [18, 5, 42, 0, 0, 0, 0, 0, 0, 0] + + ts.Set(3, 0) + + sme.Add(47) // [47, 18, 5, 42, 0, 0, 0, 0, 0, 0] + + ts.Set(4, 0) + + sme.Add(92) // [92, 47, 18, 5, 42, 0, 0, 0, 0, 0] + + ts.Set(5, 0) + + sme.Add(2) // [2, 92, 47, 18, 5, 42, 0, 0, 0, 0] + + ts.Set(6, 0) + + sme.Add(75) // [75, 2, 92, 47, 18, 5, 42, 0, 0, 0] + + ts.Set(7, 0) + + sme.Add(33) // [33, 75, 2, 92, 47, 18, 5, 42, 0, 0] + + ts.Set(8, 0) + + sme.Add(89) // [89, 33, 75, 2, 92, 47, 18, 5, 42, 0] + + ts.Set(9, 0) + + sme.Add(12) // [12, 89, 33, 75, 2, 92, 47, 18, 5, 42] + + avg := sme.Average() + require.Equal(t, (12+89+33+75+2+92+47+18+5+42)/10., avg) + + sme.Reset() + + avg = sme.Average() + require.Equal(t, 0/10., avg) +} + +func TestTotalSMA(t *testing.T) { + ts := ×rc.TestSource{ + N: time.Unix(0, 0), + } + + sme := &SMA{ + ts: ts, + window: 10 * time.Second.Nanoseconds(), + granularity: time.Second.Nanoseconds(), + } + sme.init() + + sme.Add(42) // [42, 0, 0, 0, 0, 0, 0, 0, 0, 0] + + ts.Set(1, 0) + + sme.Add(5) // [5, 42, 0, 0, 0, 0, 0, 0, 0, 0] + + ts.Set(2, 0) + + sme.Add(18) // [18, 5, 42, 0, 0, 0, 0, 0, 0, 0] + + ts.Set(3, 0) + + sme.Add(47) // [47, 18, 5, 42, 0, 0, 0, 0, 0, 0] + + ts.Set(4, 0) + + sme.Add(92) // [92, 47, 18, 5, 42, 0, 0, 0, 0, 0] + + ts.Set(5, 0) + + sme.Add(2) // [2, 92, 47, 18, 5, 42, 0, 0, 0, 0] + + ts.Set(6, 0) + + sme.Add(75) // [75, 2, 92, 47, 18, 5, 42, 0, 0, 0] + + ts.Set(7, 0) + + sme.Add(33) // [33, 75, 2, 92, 47, 18, 5, 42, 0, 0] + + ts.Set(8, 0) + + sme.Add(89) // [89, 33, 75, 2, 92, 47, 18, 5, 42, 0] + + ts.Set(9, 0) + + sme.Add(12) // [12, 89, 33, 75, 2, 92, 47, 18, 5, 42] + + total, nsamples := sme.Total() + require.Equal(t, float64(12+89+33+75+2+92+47+18+5+42), total) + require.Equal(t, 10, nsamples) +} diff --git a/session/collector.go b/session/collector.go index 1dd0489d..98eda19b 100644 --- a/session/collector.go +++ b/session/collector.go @@ -9,9 +9,8 @@ import ( "github.com/datarhei/core/v16/encoding/json" "github.com/datarhei/core/v16/log" + "github.com/datarhei/core/v16/math/average" "github.com/datarhei/core/v16/net" - - "github.com/prep/average" ) // Session represents an active session @@ -239,8 +238,8 @@ type collector struct { maxTxBitrate float64 maxSessions uint64 - rxBitrate *average.SlidingWindow - txBitrate *average.SlidingWindow + rxBitrate *average.SMA + txBitrate *average.SMA collectHistory bool history history @@ -410,8 +409,8 @@ func (c *collector) start() { c.running = true - c.rxBitrate, _ = average.New(averageWindow, averageGranularity) - c.txBitrate, _ = average.New(averageWindow, averageGranularity) + c.rxBitrate, _ = average.NewSMA(averageWindow, averageGranularity) + c.txBitrate, _ = average.NewSMA(averageWindow, averageGranularity) } func (c *collector) stop() { @@ -648,7 +647,7 @@ func (c *collector) Ingress(id string, size int64) { } if sess.Ingress(size) { - c.rxBitrate.Add(size * 8) + c.rxBitrate.Add(float64(size) * 8) c.rxBytes += uint64(size) } } @@ -667,7 +666,7 @@ func (c *collector) Egress(id string, size int64) { } if sess.Egress(size) { - c.txBitrate.Add(size * 8) + c.txBitrate.Add(float64(size) * 8) c.txBytes += uint64(size) } } @@ -709,11 +708,11 @@ func (c *collector) IsSessionsExceeded() bool { } func (c *collector) IngressBitrate() float64 { - return c.rxBitrate.Average(averageWindow) + return c.rxBitrate.Average() } func (c *collector) EgressBitrate() float64 { - return c.txBitrate.Average(averageWindow) + return c.txBitrate.Average() } func (c *collector) MaxIngressBitrate() float64 { diff --git a/session/session.go b/session/session.go index d727de7f..e943cea3 100644 --- a/session/session.go +++ b/session/session.go @@ -6,7 +6,7 @@ import ( "time" "github.com/datarhei/core/v16/log" - "github.com/prep/average" + "github.com/datarhei/core/v16/math/average" ) type session struct { @@ -27,10 +27,10 @@ type session struct { timeout time.Duration callback func(*session) - rxBitrate *average.SlidingWindow + rxBitrate *average.SMA rxBytes uint64 - txBitrate *average.SlidingWindow + txBitrate *average.SMA txBytes uint64 tickerStop context.CancelFunc @@ -59,8 +59,8 @@ func (s *session) Init(id, reference string, closeCallback func(*session), inact s.peer = "" s.extra = map[string]interface{}{} - s.rxBitrate, _ = average.New(averageWindow, averageGranularity) - s.txBitrate, _ = average.New(averageWindow, averageGranularity) + s.rxBitrate, _ = average.NewSMA(averageWindow, averageGranularity) + s.txBitrate, _ = average.NewSMA(averageWindow, averageGranularity) s.topRxBitrate = 0.0 s.topTxBitrate = 0.0 @@ -105,8 +105,8 @@ func (s *session) close() { s.tickerStop = nil } - s.rxBitrate.Stop() - s.txBitrate.Stop() + s.rxBitrate.Reset() + s.txBitrate.Reset() go s.callback(s) } @@ -157,10 +157,10 @@ func (s *session) Ingress(size int64) bool { s.stale.Stop() s.stale.Reset(s.timeout) - s.rxBitrate.Add(size * 8) + s.rxBitrate.Add(float64(size) * 8) s.rxBytes += uint64(size) - bitrate := s.rxBitrate.Average(averageWindow) + bitrate := s.rxBitrate.Average() if bitrate > s.topRxBitrate { s.topRxBitrate = bitrate } @@ -183,10 +183,10 @@ func (s *session) Egress(size int64) bool { s.stale.Stop() s.stale.Reset(s.timeout) - s.txBitrate.Add(size * 8) + s.txBitrate.Add(float64(size) * 8) s.txBytes += uint64(size) - bitrate := s.txBitrate.Average(averageWindow) + bitrate := s.txBitrate.Average() if bitrate > s.topTxBitrate { s.topTxBitrate = bitrate } @@ -199,11 +199,11 @@ func (s *session) Egress(size int64) bool { } func (s *session) RxBitrate() float64 { - return s.rxBitrate.Average(averageWindow) + return s.rxBitrate.Average() } func (s *session) TxBitrate() float64 { - return s.txBitrate.Average(averageWindow) + return s.txBitrate.Average() } func (s *session) TopRxBitrate() float64 { diff --git a/time/source.go b/time/source.go new file mode 100644 index 00000000..83eff7dd --- /dev/null +++ b/time/source.go @@ -0,0 +1,25 @@ +package time + +import "time" + +type Source interface { + Now() time.Time +} + +type StdSource struct{} + +func (s *StdSource) Now() time.Time { + return time.Now() +} + +type TestSource struct { + N time.Time +} + +func (t *TestSource) Now() time.Time { + return t.N +} + +func (t *TestSource) Set(sec int64, nsec int64) { + t.N = time.Unix(sec, nsec) +} diff --git a/vendor/github.com/prep/average/.travis.yml b/vendor/github.com/prep/average/.travis.yml deleted file mode 100644 index 9fc8e3b1..00000000 --- a/vendor/github.com/prep/average/.travis.yml +++ /dev/null @@ -1,28 +0,0 @@ -language: go - -go: - - 1.9 - - master - -# Skip the install step. Don't `go get` dependencies. Only build with the -# code in vendor/ -install: true - -matrix: - # It's ok if our code fails on unstable development versions of Go. - allow_failures: - - go: master - # Don't wait for tip tests to finish. Mark the test run green if the - # tests pass on the stable versions of Go. - fast_finish: true - -notifications: - email: false - -before_script: - - GO_FILES=$(find . -iname '*.go' -type f | grep -v /vendor/) - -script: - - test -z $(gofmt -s -l $GO_FILES) - - go tool vet . - - go test -v -race ./... diff --git a/vendor/github.com/prep/average/LICENSE b/vendor/github.com/prep/average/LICENSE deleted file mode 100644 index 6a66aea5..00000000 --- a/vendor/github.com/prep/average/LICENSE +++ /dev/null @@ -1,27 +0,0 @@ -Copyright (c) 2009 The Go Authors. All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are -met: - - * Redistributions of source code must retain the above copyright -notice, this list of conditions and the following disclaimer. - * Redistributions in binary form must reproduce the above -copyright notice, this list of conditions and the following disclaimer -in the documentation and/or other materials provided with the -distribution. - * Neither the name of Google Inc. nor the names of its -contributors may be used to endorse or promote products derived from -this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/github.com/prep/average/README.md b/vendor/github.com/prep/average/README.md deleted file mode 100644 index 60dbcc50..00000000 --- a/vendor/github.com/prep/average/README.md +++ /dev/null @@ -1,42 +0,0 @@ -average -[![TravisCI](https://travis-ci.org/prep/average.svg?branch=master)](https://travis-ci.org/prep/average.svg?branch=master) -[![Go Report Card](https://goreportcard.com/badge/github.com/prep/average)](https://goreportcard.com/report/github.com/prep/average) -[![GoDoc](https://godoc.org/github.com/prep/average?status.svg)](https://godoc.org/github.com/prep/average) -======= -This stupidly named Go package contains a single struct that is used to implement counters on a sliding time window. - -Usage ------ -```go - -import ( - "fmt" - - "github.com/prep/average" -) - -func main() { - // Create a SlidingWindow that has a window of 15 minutes, with a - // granulity of 1 minute. - sw := average.MustNew(15 * time.Minute, time.Minute) - defer sw.Stop() - - // Do some work. - sw.Add(15) - // Do some more work. - sw.Add(22) - // Do even more work. - sw.Add(22) - - fmt.Printf("Average of last 1m: %f\n", sw.Average(time.Minute) - fmt.Printf("Average of last 5m: %f\n", sw.Average(5 * time.Minute) - fmt.Printf("Average of last 15m: %f\n\n", sw.Average(15 * time.Minute) - - total, numSamples := sw.Total(15 * time.Minute) - fmt.Printf("Counter has a total of %d over %d samples", total, numSamples) -} -``` - -License -------- -This software is created for MessageBird B.V. and distributed under the BSD-style license found in the LICENSE file. diff --git a/vendor/github.com/prep/average/slidingwindow.go b/vendor/github.com/prep/average/slidingwindow.go deleted file mode 100644 index 793422dd..00000000 --- a/vendor/github.com/prep/average/slidingwindow.go +++ /dev/null @@ -1,142 +0,0 @@ -// Package average implements sliding time window. -package average - -import ( - "errors" - "sync" - "time" -) - -// SlidingWindow provides a sliding time window with a custom size and -// granularity to store int64 counters. This can be used to determine the total -// or unweighted mean average of a subset of the window size. -type SlidingWindow struct { - window time.Duration - granularity time.Duration - samples []int64 - pos int - size int - stopOnce sync.Once - stopC chan struct{} - sync.RWMutex -} - -// MustNew returns a new SlidingWindow, but panics if an error occurs. -func MustNew(window, granularity time.Duration) *SlidingWindow { - sw, err := New(window, granularity) - if err != nil { - panic(err.Error()) - } - - return sw -} - -// New returns a new SlidingWindow. -func New(window, granularity time.Duration) (*SlidingWindow, error) { - if window == 0 { - return nil, errors.New("window cannot be 0") - } - if granularity == 0 { - return nil, errors.New("granularity cannot be 0") - } - if window <= granularity || window%granularity != 0 { - return nil, errors.New("window size has to be a multiplier of the granularity size") - } - - sw := &SlidingWindow{ - window: window, - granularity: granularity, - samples: make([]int64, int(window/granularity)), - stopC: make(chan struct{}), - } - - go sw.shifter() - return sw, nil -} - -func (sw *SlidingWindow) shifter() { - ticker := time.NewTicker(sw.granularity) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - sw.Lock() - if sw.pos = sw.pos + 1; sw.pos >= len(sw.samples) { - sw.pos = 0 - } - sw.samples[sw.pos] = 0 - if sw.size < len(sw.samples) { - sw.size++ - } - sw.Unlock() - - case <-sw.stopC: - return - } - } -} - -// Add increments the value of the current sample. -func (sw *SlidingWindow) Add(v int64) { - sw.Lock() - sw.samples[sw.pos] += v - sw.Unlock() -} - -// Average returns the unweighted mean of the specified window. -func (sw *SlidingWindow) Average(window time.Duration) float64 { - total, sampleCount := sw.Total(window) - if sampleCount == 0 { - return 0 - } - - return float64(total) / float64(sampleCount) -} - -// Reset the samples in this sliding time window. -func (sw *SlidingWindow) Reset() { - sw.Lock() - defer sw.Unlock() - - sw.pos, sw.size = 0, 0 - for i := range sw.samples { - sw.samples[i] = 0 - } -} - -// Stop the shifter of this sliding time window. A stopped SlidingWindow cannot -// be started again. -func (sw *SlidingWindow) Stop() { - sw.stopOnce.Do(func() { - sw.stopC <- struct{}{} - }) -} - -// Total returns the sum of all values over the specified window, as well as -// the number of samples. -func (sw *SlidingWindow) Total(window time.Duration) (int64, int) { - if window > sw.window { - window = sw.window - } - - sampleCount := int(window / sw.granularity) - if sampleCount > sw.size { - sampleCount = sw.size - } - - sw.RLock() - defer sw.RUnlock() - - var total int64 - for i := 1; i <= sampleCount; i++ { - pos := sw.pos - i - if pos < 0 { - pos += len(sw.samples) - } - - total += sw.samples[pos] - } - - return total, sampleCount -} diff --git a/vendor/modules.txt b/vendor/modules.txt index b4966dc5..792d3807 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -327,9 +327,6 @@ github.com/pmezard/go-difflib/difflib # github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 ## explicit; go 1.14 github.com/power-devops/perfstat -# github.com/prep/average v0.0.0-20200506183628-d26c465f48c3 -## explicit -github.com/prep/average # github.com/prometheus/client_golang v1.20.4 ## explicit; go 1.20 github.com/prometheus/client_golang/internal/github.com/golang/gddo/httputil