Files
core/monitor/restream.go
2023-04-11 14:52:27 +02:00

160 lines
7.6 KiB
Go

package monitor
import (
"strconv"
"github.com/datarhei/core/v16/monitor/metric"
"github.com/datarhei/core/v16/restream"
)
type restreamCollector struct {
prefix string
r restream.Restreamer
restreamProcessDescr *metric.Description
restreamProcessStatesDescr *metric.Description
restreamProcessIODescr *metric.Description
restreamStatesDescr *metric.Description
}
func NewRestreamCollector(r restream.Restreamer) metric.Collector {
c := &restreamCollector{
prefix: "restream",
r: r,
}
c.restreamProcessDescr = metric.NewDesc("restream_process", "Current process values by name", []string{"processid", "state", "order", "name"})
c.restreamProcessStatesDescr = metric.NewDesc("restream_process_states", "Current process state", []string{"processid", "state"})
c.restreamProcessIODescr = metric.NewDesc("restream_io", "Current process IO values by name", []string{"processid", "type", "id", "address", "index", "stream", "media", "name"})
c.restreamStatesDescr = metric.NewDesc("restream_state", "Summarized current process states", []string{"state"})
return c
}
func (c *restreamCollector) Prefix() string {
return c.prefix
}
func (c *restreamCollector) Describe() []*metric.Description {
return []*metric.Description{
c.restreamProcessDescr,
c.restreamProcessStatesDescr,
c.restreamProcessIODescr,
c.restreamStatesDescr,
}
}
func (c *restreamCollector) Collect() metric.Metrics {
metrics := metric.NewMetrics()
value := float64(0)
states := map[string]float64{
"failed": 0,
"finished": 0,
"finishing": 0,
"killed": 0,
"running": 0,
"starting": 0,
}
ids := c.r.GetProcessIDs("", "")
for _, id := range ids {
state, _ := c.r.GetProcessState(id)
if state == nil {
continue
}
proc, _ := c.r.GetProcess(id)
if proc == nil {
continue
}
states[state.State]++
metrics.Add(metric.NewValue(c.restreamProcessDescr, float64(state.Progress.Frame), id, state.State, state.Order, "frame"))
metrics.Add(metric.NewValue(c.restreamProcessDescr, float64(state.Progress.FPS), id, state.State, state.Order, "fps"))
metrics.Add(metric.NewValue(c.restreamProcessDescr, float64(state.Progress.Speed), id, state.State, state.Order, "speed"))
metrics.Add(metric.NewValue(c.restreamProcessDescr, state.Progress.Quantizer, id, state.State, state.Order, "q"))
metrics.Add(metric.NewValue(c.restreamProcessDescr, float64(state.Progress.Size), id, state.State, state.Order, "size"))
metrics.Add(metric.NewValue(c.restreamProcessDescr, state.Progress.Time, id, state.State, state.Order, "time"))
metrics.Add(metric.NewValue(c.restreamProcessDescr, float64(state.Progress.Drop), id, state.State, state.Order, "drop"))
metrics.Add(metric.NewValue(c.restreamProcessDescr, float64(state.Progress.Dup), id, state.State, state.Order, "dup"))
metrics.Add(metric.NewValue(c.restreamProcessDescr, float64(state.Progress.Packet), id, state.State, state.Order, "packet"))
metrics.Add(metric.NewValue(c.restreamProcessDescr, state.Progress.Bitrate, id, state.State, state.Order, "bitrate"))
metrics.Add(metric.NewValue(c.restreamProcessDescr, state.CPU, id, state.State, state.Order, "cpu"))
metrics.Add(metric.NewValue(c.restreamProcessDescr, float64(state.Memory), id, state.State, state.Order, "memory"))
metrics.Add(metric.NewValue(c.restreamProcessDescr, state.Duration, id, state.State, state.Order, "uptime"))
if proc.Config != nil {
metrics.Add(metric.NewValue(c.restreamProcessDescr, proc.Config.LimitCPU, id, state.State, state.Order, "cpu_limit"))
metrics.Add(metric.NewValue(c.restreamProcessDescr, float64(proc.Config.LimitMemory), id, state.State, state.Order, "memory_limit"))
}
metrics.Add(metric.NewValue(c.restreamProcessStatesDescr, float64(state.States.Failed), id, "failed"))
metrics.Add(metric.NewValue(c.restreamProcessStatesDescr, float64(state.States.Finished), id, "finished"))
metrics.Add(metric.NewValue(c.restreamProcessStatesDescr, float64(state.States.Finishing), id, "finishing"))
metrics.Add(metric.NewValue(c.restreamProcessStatesDescr, float64(state.States.Killed), id, "killed"))
metrics.Add(metric.NewValue(c.restreamProcessStatesDescr, float64(state.States.Running), id, "running"))
metrics.Add(metric.NewValue(c.restreamProcessStatesDescr, float64(state.States.Starting), id, "starting"))
for i := range state.Progress.Input {
io := &state.Progress.Input[i]
index := strconv.FormatUint(io.Index, 10)
stream := strconv.FormatUint(io.Stream, 10)
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(io.Frame), id, "input", io.ID, io.Address, index, stream, io.Type, "frame"))
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(io.FPS), id, "input", io.ID, io.Address, index, stream, io.Type, "fps"))
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(io.Packet), id, "input", io.ID, io.Address, index, stream, io.Type, "packet"))
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(io.PPS), id, "input", io.ID, io.Address, index, stream, io.Type, "pps"))
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(io.Size), id, "input", io.ID, io.Address, index, stream, io.Type, "size"))
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(io.Bitrate), id, "input", io.ID, io.Address, index, stream, io.Type, "bitrate"))
if io.AVstream != nil {
a := io.AVstream
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(a.Queue), id, "input", io.ID, io.Address, index, stream, io.Type, "avstream_queue"))
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(a.Dup), id, "input", io.ID, io.Address, index, stream, io.Type, "avstream_dup"))
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(a.Drop), id, "input", io.ID, io.Address, index, stream, io.Type, "avstream_drop"))
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(a.Enc), id, "input", io.ID, io.Address, index, stream, io.Type, "avstream_enc"))
value = 0
if a.Looping {
value = 1
}
metrics.Add(metric.NewValue(c.restreamProcessIODescr, value, id, "input", io.ID, io.Address, index, stream, io.Type, "avstream_looping"))
value = 0
if a.Duplicating {
value = 1
}
metrics.Add(metric.NewValue(c.restreamProcessIODescr, value, id, "input", io.ID, io.Address, index, stream, io.Type, "avstream_duplicating"))
}
}
for i := range state.Progress.Output {
io := &state.Progress.Output[i]
index := strconv.FormatUint(io.Index, 10)
stream := strconv.FormatUint(io.Stream, 10)
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(io.Frame), id, "output", io.ID, io.Address, index, stream, io.Type, "frame"))
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(io.FPS), id, "output", io.ID, io.Address, index, stream, io.Type, "fps"))
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(io.Packet), id, "output", io.ID, io.Address, index, stream, io.Type, "packet"))
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(io.PPS), id, "output", io.ID, io.Address, index, stream, io.Type, "pps"))
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(io.Size), id, "output", io.ID, io.Address, index, stream, io.Type, "size"))
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(io.Bitrate), id, "output", io.ID, io.Address, index, stream, io.Type, "bitrate"))
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(io.Quantizer), id, "output", io.ID, io.Address, index, stream, io.Type, "q"))
}
}
for state, value := range states {
metrics.Add(metric.NewValue(c.restreamStatesDescr, value, state))
}
return metrics
}
func (c *restreamCollector) Stop() {}