Added allocated frames + packets stats

This commit is contained in:
Quentin Renard
2022-10-11 14:42:33 +02:00
parent a344829c75
commit c687113349
18 changed files with 94 additions and 72 deletions

2
go.mod
View File

@@ -5,7 +5,7 @@ go 1.13
require (
github.com/BurntSushi/toml v0.3.1
github.com/asticode/go-astiav v0.6.0
github.com/asticode/go-astikit v0.31.0
github.com/asticode/go-astikit v0.33.0
github.com/shirou/gopsutil/v3 v3.21.10
github.com/stretchr/testify v1.7.0
)

4
go.sum
View File

@@ -5,8 +5,8 @@ github.com/StackExchange/wmi v1.2.1/go.mod h1:rcmrprowKIVzvc+NUiLncP2uuArMWLCbu9
github.com/asticode/go-astiav v0.6.0 h1:OEizrERY5Aj+H8X+479v9Lu6ipdYgSZyxU79hCM5JpY=
github.com/asticode/go-astiav v0.6.0/go.mod h1:phvUnSSlV91S/PELeLkDisYiRLOssxWOsj4oDrqM/54=
github.com/asticode/go-astikit v0.28.2/go.mod h1:h4ly7idim1tNhaVkdVBeXQZEE3L0xblP7fCWbgwipF0=
github.com/asticode/go-astikit v0.31.0 h1:JMQqIIXvvaoXCzoIcilkCiTOszezSfC5zncfjPRGld4=
github.com/asticode/go-astikit v0.31.0/go.mod h1:h4ly7idim1tNhaVkdVBeXQZEE3L0xblP7fCWbgwipF0=
github.com/asticode/go-astikit v0.33.0 h1:FG2b8+jmEXRpdS4YZ/a1e9nsI3qWUKhAf4OBWALgYg4=
github.com/asticode/go-astikit v0.33.0/go.mod h1:h4ly7idim1tNhaVkdVBeXQZEE3L0xblP7fCWbgwipF0=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=

View File

@@ -104,24 +104,26 @@ func (d *Decoder) addStats() {
// Get stats
ss := d.c.Stats()
ss = append(ss, d.d.stats()...)
ss = append(ss, d.fp.stats()...)
ss = append(ss, d.pp.stats()...)
ss = append(ss,
astikit.StatOptions{
Handler: d.statIncomingRate,
Metadata: &astikit.StatMetadata{
Description: "Number of packets coming in per second",
Label: "Incoming rate",
Name: StatNameIncomingRate,
Unit: "pps",
},
Valuer: d.statIncomingRate,
},
astikit.StatOptions{
Handler: d.statProcessedRate,
Metadata: &astikit.StatMetadata{
Description: "Number of packets processed per second",
Label: "Processed rate",
Name: StatNameProcessedRate,
Unit: "pps",
},
Valuer: d.statProcessedRate,
},
)

View File

@@ -412,14 +412,15 @@ func (d *Demuxer) probe() (err error) {
func (d *Demuxer) addStats() {
// Get stats
ss := d.d.stats()
ss = append(ss, d.p.stats()...)
ss = append(ss, astikit.StatOptions{
Handler: d.statIncomingRate,
Metadata: &astikit.StatMetadata{
Description: "Number of bits going in per second",
Label: "Incoming rate",
Name: StatNameIncomingRate,
Unit: "bps",
},
Valuer: d.statIncomingRate,
})
// Add stats

View File

@@ -144,24 +144,26 @@ func (e *Encoder) addStats() {
// Get stats
ss := e.c.Stats()
ss = append(ss, e.d.stats()...)
ss = append(ss, e.fp.stats()...)
ss = append(ss, e.pp.stats()...)
ss = append(ss,
astikit.StatOptions{
Handler: e.statIncomingRate,
Metadata: &astikit.StatMetadata{
Description: "Number of frames coming in per second",
Label: "Incoming rate",
Name: StatNameIncomingRate,
Unit: "fps",
},
Valuer: e.statIncomingRate,
},
astikit.StatOptions{
Handler: e.statProcessedRate,
Metadata: &astikit.StatMetadata{
Description: "Number of frames processed per second",
Label: "Processed rate",
Name: StatNameProcessedRate,
Unit: "fps",
},
Valuer: e.statProcessedRate,
},
)

View File

@@ -9,10 +9,11 @@ const (
// Stat names
const (
StatNameAverageDelay = "astilibav.average.delay"
StatNameFilledRate = "astilibav.filled.rate"
StatNameIncomingRate = "astilibav.incoming.rate"
StatNameOutgoingRate = "astilibav.outgoing.rate"
StatNameProcessedRate = "astilibav.processed.rate"
StatNameWorkRatio = "astilibav.work.ratio"
StatNameAllocatedFrames = "astilibav.allocated.frames"
StatNameAllocatedPackets = "astilibav.allocated.packets"
StatNameAverageDelay = "astilibav.average.delay"
StatNameFilledRate = "astilibav.filled.rate"
StatNameIncomingRate = "astilibav.incoming.rate"
StatNameOutgoingRate = "astilibav.outgoing.rate"
StatNameProcessedRate = "astilibav.processed.rate"
)

View File

@@ -221,24 +221,25 @@ func (f *Filterer) addStats() {
// Get stats
ss := f.c.Stats()
ss = append(ss, f.d.stats()...)
ss = append(ss, f.p.stats()...)
ss = append(ss,
astikit.StatOptions{
Handler: f.statIncomingRate,
Metadata: &astikit.StatMetadata{
Description: "Number of frames coming in per second",
Label: "Incoming rate",
Name: StatNameIncomingRate,
Unit: "fps",
},
Valuer: f.statIncomingRate,
},
astikit.StatOptions{
Handler: f.statProcessedRate,
Metadata: &astikit.StatMetadata{
Description: "Number of frames processed per second",
Label: "Processed rate",
Name: StatNameProcessedRate,
Unit: "fps",
},
Valuer: f.statProcessedRate,
},
)

View File

@@ -65,24 +65,25 @@ func (f *Forwarder) addStats() {
// Get stats
ss := f.c.Stats()
ss = append(ss, f.d.stats()...)
ss = append(ss, f.p.stats()...)
ss = append(ss,
astikit.StatOptions{
Handler: f.statIncomingRate,
Metadata: &astikit.StatMetadata{
Description: "Number of frames coming in per second",
Label: "Incoming rate",
Name: StatNameIncomingRate,
Unit: "fps",
},
Valuer: f.statIncomingRate,
},
astikit.StatOptions{
Handler: f.statProcessedRate,
Metadata: &astikit.StatMetadata{
Description: "Number of frames processed per second",
Label: "Processed rate",
Name: StatNameProcessedRate,
Unit: "fps",
},
Valuer: f.statProcessedRate,
},
)

View File

@@ -88,27 +88,29 @@ func (d *frameDispatcher) dispatch(f *astiav.Frame, descriptor Descriptor) {
func (d *frameDispatcher) stats() []astikit.StatOptions {
return []astikit.StatOptions{
{
Handler: d.statOutgoingRate,
Metadata: &astikit.StatMetadata{
Description: "Number of frames going out per second",
Label: "Outgoing rate",
Name: StatNameOutgoingRate,
Unit: "fps",
},
Valuer: d.statOutgoingRate,
},
}
}
type framePool struct {
c astiencoder.Closer
m *sync.Mutex
p []*astiav.Frame
c astiencoder.Closer
m *sync.Mutex
p []*astiav.Frame
statAllocatedCount *astikit.CounterStat
}
func newFramePool(c astiencoder.Closer) *framePool {
return &framePool{
c: c,
m: &sync.Mutex{},
c: c,
m: &sync.Mutex{},
statAllocatedCount: astikit.NewCounterStat(),
}
}
@@ -117,6 +119,7 @@ func (p *framePool) get() (f *astiav.Frame) {
defer p.m.Unlock()
if len(p.p) == 0 {
f = astiav.AllocFrame()
p.statAllocatedCount.Add(1)
p.c.AddClose(f.Free)
return
}
@@ -131,3 +134,17 @@ func (p *framePool) put(f *astiav.Frame) {
f.Unref()
p.p = append(p.p, f)
}
func (p *framePool) stats() []astikit.StatOptions {
return []astikit.StatOptions{
{
Metadata: &astikit.StatMetadata{
Description: "Number of allocated frames",
Label: "Allocated frames",
Name: StatNameAllocatedFrames,
Unit: "f",
},
Valuer: p.statAllocatedCount,
},
}
}

View File

@@ -83,24 +83,25 @@ func (r *FrameRateEmulator) addStats() {
// Get stats
ss := r.c.Stats()
ss = append(ss, r.d.stats()...)
ss = append(ss, r.p.stats()...)
ss = append(ss,
astikit.StatOptions{
Handler: r.statIncomingRate,
Metadata: &astikit.StatMetadata{
Description: "Number of frames coming in per second",
Label: "Incoming rate",
Name: StatNameIncomingRate,
Unit: "fps",
},
Valuer: r.statIncomingRate,
},
astikit.StatOptions{
Handler: r.statProcessedRate,
Metadata: &astikit.StatMetadata{
Description: "Number of frames processed per second",
Label: "Processed rate",
Name: StatNameProcessedRate,
Unit: "fps",
},
Valuer: r.statProcessedRate,
},
)

View File

@@ -99,33 +99,34 @@ func NewMuxer(o MuxerOptions, eh *astiencoder.EventHandler, c *astikit.Closer, s
func (m *Muxer) addStats() {
// Get stats
ss := m.c.Stats()
ss = append(ss, m.p.stats()...)
ss = append(ss,
astikit.StatOptions{
Handler: m.statIncomingRate,
Metadata: &astikit.StatMetadata{
Description: "Number of packets coming in per second",
Label: "Incoming rate",
Name: StatNameIncomingRate,
Unit: "pps",
},
Valuer: m.statIncomingRate,
},
astikit.StatOptions{
Handler: m.statOutgoingRate,
Metadata: &astikit.StatMetadata{
Description: "Number of bits going out per second",
Label: "Outgoing rate",
Name: StatNameOutgoingRate,
Unit: "bps",
},
Valuer: m.statOutgoingRate,
},
astikit.StatOptions{
Handler: m.statProcessedRate,
Metadata: &astikit.StatMetadata{
Description: "Number of packets processed per second",
Label: "Processed rate",
Name: StatNameProcessedRate,
Unit: "pps",
},
Valuer: m.statProcessedRate,
},
)

View File

@@ -92,13 +92,13 @@ func (d *pktDispatcher) dispatch(pkt *astiav.Packet, descriptor Descriptor) {
func (d *pktDispatcher) stats() []astikit.StatOptions {
return []astikit.StatOptions{
{
Handler: d.statOutgoingRate,
Metadata: &astikit.StatMetadata{
Description: "Number of packets going out per second",
Label: "Outgoing rate",
Name: StatNameOutgoingRate,
Unit: "pps",
},
Valuer: d.statOutgoingRate,
},
}
}
@@ -133,15 +133,17 @@ func (c *pktCond) UsePkt(pkt *astiav.Packet) bool {
}
type pktPool struct {
c astiencoder.Closer
m *sync.Mutex
p []*astiav.Packet
c astiencoder.Closer
m *sync.Mutex
p []*astiav.Packet
statAllocatedCount *astikit.CounterStat
}
func newPktPool(c astiencoder.Closer) *pktPool {
return &pktPool{
c: c,
m: &sync.Mutex{},
c: c,
m: &sync.Mutex{},
statAllocatedCount: astikit.NewCounterStat(),
}
}
@@ -150,6 +152,7 @@ func (p *pktPool) get() (pkt *astiav.Packet) {
defer p.m.Unlock()
if len(p.p) == 0 {
pkt = astiav.AllocPacket()
p.statAllocatedCount.Add(1)
p.c.AddClose(pkt.Free)
return
}
@@ -164,3 +167,17 @@ func (p *pktPool) put(pkt *astiav.Packet) {
pkt.Unref()
p.p = append(p.p, pkt)
}
func (p *pktPool) stats() []astikit.StatOptions {
return []astikit.StatOptions{
{
Metadata: &astikit.StatMetadata{
Description: "Number of allocated packets",
Label: "Allocated packets",
Name: StatNameAllocatedPackets,
Unit: "p",
},
Valuer: p.statAllocatedCount,
},
}
}

View File

@@ -79,24 +79,25 @@ func NewPktDumper(o PktDumperOptions, eh *astiencoder.EventHandler, c *astikit.C
func (d *PktDumper) addStats() {
// Get stats
ss := d.c.Stats()
ss = append(ss, d.p.stats()...)
ss = append(ss,
astikit.StatOptions{
Handler: d.statIncomingRate,
Metadata: &astikit.StatMetadata{
Description: "Number of packets coming in per second",
Label: "Incoming rate",
Name: StatNameIncomingRate,
Unit: "pps",
},
Valuer: d.statIncomingRate,
},
astikit.StatOptions{
Handler: d.statProcessedRate,
Metadata: &astikit.StatMetadata{
Description: "Number of packets processed per second",
Label: "Processed rate",
Name: StatNameProcessedRate,
Unit: "pps",
},
Valuer: d.statProcessedRate,
},
)

View File

@@ -67,24 +67,25 @@ func (p *PktPiper) addStats() {
// Get stats
ss := p.c.Stats()
ss = append(ss, p.d.stats()...)
ss = append(ss, p.p.stats()...)
ss = append(ss,
astikit.StatOptions{
Handler: p.statIncomingRate,
Metadata: &astikit.StatMetadata{
Description: "Number of pkts coming in per second",
Label: "Incoming rate",
Name: StatNameIncomingRate,
Unit: "fps",
},
Valuer: p.statIncomingRate,
},
astikit.StatOptions{
Handler: p.statProcessedRate,
Metadata: &astikit.StatMetadata{
Description: "Number of pkts processed per second",
Label: "Processed rate",
Name: StatNameProcessedRate,
Unit: "fps",
},
Valuer: p.statProcessedRate,
},
)

View File

@@ -118,42 +118,43 @@ func (r *RateEnforcer) addStats() {
// Get stats
ss := r.c.Stats()
ss = append(ss, r.d.stats()...)
ss = append(ss, r.p.stats()...)
ss = append(ss,
astikit.StatOptions{
Handler: r.statIncomingRate,
Metadata: &astikit.StatMetadata{
Description: "Number of frames coming in per second",
Label: "Incoming rate",
Name: StatNameIncomingRate,
Unit: "fps",
},
Valuer: r.statIncomingRate,
},
astikit.StatOptions{
Handler: r.statProcessedRate,
Metadata: &astikit.StatMetadata{
Description: "Number of frames processed per second",
Label: "Processed rate",
Name: StatNameProcessedRate,
Unit: "fps",
},
Valuer: r.statProcessedRate,
},
astikit.StatOptions{
Handler: r.statDelayAvg,
Metadata: &astikit.StatMetadata{
Description: "Average delay of frames coming in",
Label: "Average delay",
Name: StatNameAverageDelay,
Unit: "ns",
},
Valuer: r.statDelayAvg,
},
astikit.StatOptions{
Handler: r.statFilledRate,
Metadata: &astikit.StatMetadata{
Description: "Number of frames filled per second",
Label: "Filled rate",
Name: StatNameFilledRate,
Unit: "fps",
},
Valuer: r.statFilledRate,
},
)

12
node.go
View File

@@ -296,17 +296,12 @@ func (n *BaseNode) Start(ctx context.Context, tc CreateTaskFunc, execFunc BaseNo
// Handle stats
if n.s != nil {
// Make sure to stop and delete stats
// Make sure to delete stats
defer func() {
// Lock
n.m.Lock()
defer n.m.Unlock()
// Stop stats
for _, s := range n.ss {
s.Handler.Stop()
}
// Delete stats
n.s.DelStats(n.target, n.ss...)
}()
@@ -314,11 +309,6 @@ func (n *BaseNode) Start(ctx context.Context, tc CreateTaskFunc, execFunc BaseNo
// Add stats
n.m.Lock()
n.s.AddStats(n.target, n.ss...)
// Start stats
for _, s := range n.ss {
s.Handler.Start()
}
n.m.Unlock()
}

19
stat.go
View File

@@ -5,7 +5,6 @@ import (
"fmt"
"os"
"sync"
"sync/atomic"
"time"
"github.com/asticode/go-astikit"
@@ -131,8 +130,7 @@ type statHostUsageMemory struct {
}
type statPSUtil struct {
p *process.Process
started uint32
p *process.Process
}
func newStatPSUtil() (u *statPSUtil, err error) {
@@ -147,20 +145,7 @@ func newStatPSUtil() (u *statPSUtil, err error) {
return
}
func (s *statPSUtil) Start() {
atomic.SwapUint32(&s.started, 1)
}
func (s *statPSUtil) Stop() {
atomic.SwapUint32(&s.started, 0)
}
func (s *statPSUtil) Value(delta time.Duration) interface{} {
// Check started
if atomic.LoadUint32(&s.started) == 0 {
return nil
}
func (s *statPSUtil) Value() interface{} {
// Get CPU
var v statHostUsage
if vs, err := cpu.Percent(0, true); err == nil {

View File

@@ -46,8 +46,8 @@ func (w *Workflow) AddDefaultStats() (err error) {
// Add host usage stat
w.bn.AddStats(astikit.StatOptions{
Handler: u,
Metadata: &astikit.StatMetadata{Name: StatNameHostUsage},
Valuer: u,
})
return
}