metrics: allow filtering metrics (#953) (#4809)

This commit is contained in:
Alessandro Ros
2025-08-02 13:10:57 +02:00
committed by GitHub
parent 21404a6821
commit cc48fdb2b8
4 changed files with 531 additions and 198 deletions

View File

@@ -2253,6 +2253,20 @@ webrtc_sessions_rtcp_packets_received{id="[id]",path="[path]",remoteAddr="[remot
webrtc_sessions_rtcp_packets_sent{id="[id]",path="[path]",remoteAddr="[remoteAddr]",state="[state]"} 123
```
Metrics can be tuned and filtered by using query parameters:
* `type=[TYPE]`: show metrics of a certain type only (where TYPE can be `paths`, `hls_muxers`, `rtsp_conns`, `rtsp_sessions`, `rtsps_conns`, `rtsps_sessions`, `rtmp_conns`, `rtmps_conns`, `srt_conns`, `webrtc_sessions`)
* `path=[PATH]`: show metrics belonging to a specific path only
* `hls_muxer=[PATH]`: show metrics belonging to a specific HLS muxer only
* `rtsp_conn=[ID]` show metrics belonging to a specific RTSP connection only
* `rtsp_session=[SESSION]`: show metrics belonging to a specific RTSP session only
* `rtsps_conn=[ID]` show metrics belonging to a specific RTSPS connection only
* `rtsps_session=[SESSION]`: show metrics belonging to a specific RTSPS session only
* `rtmp_conn=[ID]` show metrics belonging to a specific RTMP connection only
* `rtmps_conn=[ID]` show metrics belonging to a specific RTMPS connection only
* `srt_conn=[ID]` show metrics belonging to a specific SRT connection only
* `webrtc_session=[ID]` show metrics belonging to a specific WebRTC session only
### pprof
A performance monitor, compatible with pprof, can be enabled with the parameter `pprof: yes`; then the server can be queried for metrics with pprof-compatible tools, like:

View File

@@ -74,6 +74,9 @@ func TestMetrics(t *testing.T) {
bo := httpPullFile(t, hc, "http://localhost:9998/metrics")
require.Equal(t, `paths 0
paths_bytes_received 0
paths_bytes_sent 0
paths_readers 0
hls_muxers 0
hls_muxers_bytes_sent 0
rtsp_conns 0
@@ -490,6 +493,10 @@ webrtc_sessions_rtcp_packets_sent 0
bo := httpPullFile(t, hc, "http://localhost:9998/metrics")
require.Equal(t, "paths 0\n", string(bo))
require.Equal(t, "paths 0\n"+
"paths_bytes_received 0\n"+
"paths_bytes_sent 0\n"+
"paths_readers 0\n",
string(bo))
})
}

View File

@@ -175,92 +175,130 @@ func (m *Metrics) middlewareAuth(ctx *gin.Context) {
}
func (m *Metrics) onMetrics(ctx *gin.Context) {
typ := ctx.Query("type")
pathFilter := ctx.Query("path")
hlsMuxerFilter := ctx.Query("hls_muxer")
rtspConnFilter := ctx.Query("rtsp_conn")
rtspSessionFilter := ctx.Query("rtsp_session")
rtspsConnFilter := ctx.Query("rtsps_conn")
rtspsSessionFilter := ctx.Query("rtsps_session")
rtmpConnFilter := ctx.Query("rtmp_conn")
rtmpsConnFilter := ctx.Query("rtmps_conn")
srtConnFilter := ctx.Query("srt_conn")
webrtcSessionFilter := ctx.Query("webrtc_session")
anyFilterActive := pathFilter != "" ||
hlsMuxerFilter != "" ||
rtspConnFilter != "" ||
rtspSessionFilter != "" ||
rtspsConnFilter != "" ||
rtspsSessionFilter != "" ||
rtmpConnFilter != "" ||
rtmpsConnFilter != "" ||
srtConnFilter != "" ||
webrtcSessionFilter != ""
out := ""
data, err := m.pathManager.APIPathsList()
if err == nil && len(data.Items) != 0 {
for _, i := range data.Items {
var state string
if i.Ready {
state = "ready"
} else {
state = "notReady"
}
ta := tags(map[string]string{
"name": i.Name,
"state": state,
})
out += metric("paths", ta, 1)
out += metric("paths_bytes_received", ta, int64(i.BytesReceived))
out += metric("paths_bytes_sent", ta, int64(i.BytesSent))
out += metric("paths_readers", ta, int64(len(i.Readers)))
}
} else {
out += metric("paths", "", 0)
}
if !interfaceIsEmpty(m.hlsServer) {
var data *defs.APIHLSMuxerList
data, err = m.hlsServer.APIMuxersList()
if (typ == "" || typ == "paths") && (!anyFilterActive || pathFilter != "") {
data, err := m.pathManager.APIPathsList()
if err == nil && len(data.Items) != 0 {
for _, i := range data.Items {
ta := tags(map[string]string{
"name": i.Path,
})
out += metric("hls_muxers", ta, 1)
out += metric("hls_muxers_bytes_sent", ta, int64(i.BytesSent))
if pathFilter == "" || pathFilter == i.Name {
var state string
if i.Ready {
state = "ready"
} else {
state = "notReady"
}
ta := tags(map[string]string{
"name": i.Name,
"state": state,
})
out += metric("paths", ta, 1)
out += metric("paths_bytes_received", ta, int64(i.BytesReceived))
out += metric("paths_bytes_sent", ta, int64(i.BytesSent))
out += metric("paths_readers", ta, int64(len(i.Readers)))
}
}
} else {
} else if pathFilter == "" {
out += metric("paths", "", 0)
out += metric("paths_bytes_received", "", 0)
out += metric("paths_bytes_sent", "", 0)
out += metric("paths_readers", "", 0)
}
}
if !interfaceIsEmpty(m.hlsServer) &&
(typ == "" || typ == "hls_muxers") &&
(!anyFilterActive || hlsMuxerFilter != "") {
var data *defs.APIHLSMuxerList
data, err := m.hlsServer.APIMuxersList()
if err == nil && len(data.Items) != 0 {
for _, i := range data.Items {
if hlsMuxerFilter == "" || hlsMuxerFilter == i.Path {
ta := tags(map[string]string{
"name": i.Path,
})
out += metric("hls_muxers", ta, 1)
out += metric("hls_muxers_bytes_sent", ta, int64(i.BytesSent))
}
}
} else if hlsMuxerFilter == "" {
out += metric("hls_muxers", "", 0)
out += metric("hls_muxers_bytes_sent", "", 0)
}
}
if !interfaceIsEmpty(m.rtspServer) { //nolint:dupl
func() {
if (typ == "" || typ == "rtsp_conns") && (!anyFilterActive || rtspConnFilter != "") {
var data *defs.APIRTSPConnsList
data, err = m.rtspServer.APIConnsList()
data, err := m.rtspServer.APIConnsList()
if err == nil && len(data.Items) != 0 {
for _, i := range data.Items {
ta := tags(map[string]string{
"id": i.ID.String(),
})
out += metric("rtsp_conns", ta, 1)
out += metric("rtsp_conns_bytes_received", ta, int64(i.BytesReceived))
out += metric("rtsp_conns_bytes_sent", ta, int64(i.BytesSent))
if rtspConnFilter == "" || rtspConnFilter == i.ID.String() {
ta := tags(map[string]string{
"id": i.ID.String(),
})
out += metric("rtsp_conns", ta, 1)
out += metric("rtsp_conns_bytes_received", ta, int64(i.BytesReceived))
out += metric("rtsp_conns_bytes_sent", ta, int64(i.BytesSent))
}
}
} else {
} else if rtspConnFilter == "" {
out += metric("rtsp_conns", "", 0)
out += metric("rtsp_conns_bytes_received", "", 0)
out += metric("rtsp_conns_bytes_sent", "", 0)
}
}()
}
func() {
if (typ == "" || typ == "rtsp_sessions") && (!anyFilterActive || rtspSessionFilter != "") {
var data *defs.APIRTSPSessionList
data, err = m.rtspServer.APISessionsList()
data, err := m.rtspServer.APISessionsList()
if err == nil && len(data.Items) != 0 {
for _, i := range data.Items {
ta := tags(map[string]string{
"id": i.ID.String(),
"state": string(i.State),
"path": i.Path,
"remoteAddr": i.RemoteAddr,
})
out += metric("rtsp_sessions", ta, 1)
out += metric("rtsp_sessions_bytes_received", ta, int64(i.BytesReceived))
out += metric("rtsp_sessions_bytes_sent", ta, int64(i.BytesSent))
out += metric("rtsp_sessions_rtp_packets_received", ta, int64(i.RTPPacketsReceived))
out += metric("rtsp_sessions_rtp_packets_sent", ta, int64(i.RTPPacketsSent))
out += metric("rtsp_sessions_rtp_packets_lost", ta, int64(i.RTPPacketsLost))
out += metric("rtsp_sessions_rtp_packets_in_error", ta, int64(i.RTPPacketsInError))
out += metricFloat("rtsp_sessions_rtp_packets_jitter", ta, i.RTPPacketsJitter)
out += metric("rtsp_sessions_rtcp_packets_received", ta, int64(i.RTCPPacketsReceived))
out += metric("rtsp_sessions_rtcp_packets_sent", ta, int64(i.RTCPPacketsSent))
out += metric("rtsp_sessions_rtcp_packets_in_error", ta, int64(i.RTCPPacketsInError))
if rtspSessionFilter == "" || rtspSessionFilter == i.ID.String() {
ta := tags(map[string]string{
"id": i.ID.String(),
"state": string(i.State),
"path": i.Path,
"remoteAddr": i.RemoteAddr,
})
out += metric("rtsp_sessions", ta, 1)
out += metric("rtsp_sessions_bytes_received", ta, int64(i.BytesReceived))
out += metric("rtsp_sessions_bytes_sent", ta, int64(i.BytesSent))
out += metric("rtsp_sessions_rtp_packets_received", ta, int64(i.RTPPacketsReceived))
out += metric("rtsp_sessions_rtp_packets_sent", ta, int64(i.RTPPacketsSent))
out += metric("rtsp_sessions_rtp_packets_lost", ta, int64(i.RTPPacketsLost))
out += metric("rtsp_sessions_rtp_packets_in_error", ta, int64(i.RTPPacketsInError))
out += metricFloat("rtsp_sessions_rtp_packets_jitter", ta, i.RTPPacketsJitter)
out += metric("rtsp_sessions_rtcp_packets_received", ta, int64(i.RTCPPacketsReceived))
out += metric("rtsp_sessions_rtcp_packets_sent", ta, int64(i.RTCPPacketsSent))
out += metric("rtsp_sessions_rtcp_packets_in_error", ta, int64(i.RTCPPacketsInError))
}
}
} else {
} else if rtspSessionFilter == "" {
out += metric("rtsp_sessions", "", 0)
out += metric("rtsp_sessions_bytes_received", "", 0)
out += metric("rtsp_sessions_bytes_sent", "", 0)
@@ -273,53 +311,57 @@ func (m *Metrics) onMetrics(ctx *gin.Context) {
out += metric("rtsp_sessions_rtcp_packets_sent", "", 0)
out += metric("rtsp_sessions_rtcp_packets_in_error", "", 0)
}
}()
}
}
if !interfaceIsEmpty(m.rtspsServer) { //nolint:dupl
func() {
if (typ == "" || typ == "rtsps_conns") && (!anyFilterActive || rtspsConnFilter != "") {
var data *defs.APIRTSPConnsList
data, err = m.rtspsServer.APIConnsList()
data, err := m.rtspsServer.APIConnsList()
if err == nil && len(data.Items) != 0 {
for _, i := range data.Items {
ta := tags(map[string]string{
"id": i.ID.String(),
})
out += metric("rtsps_conns", ta, 1)
out += metric("rtsps_conns_bytes_received", ta, int64(i.BytesReceived))
out += metric("rtsps_conns_bytes_sent", ta, int64(i.BytesSent))
if rtspsConnFilter == "" || rtspsConnFilter == i.ID.String() {
ta := tags(map[string]string{
"id": i.ID.String(),
})
out += metric("rtsps_conns", ta, 1)
out += metric("rtsps_conns_bytes_received", ta, int64(i.BytesReceived))
out += metric("rtsps_conns_bytes_sent", ta, int64(i.BytesSent))
}
}
} else {
} else if rtspsConnFilter == "" {
out += metric("rtsps_conns", "", 0)
out += metric("rtsps_conns_bytes_received", "", 0)
out += metric("rtsps_conns_bytes_sent", "", 0)
}
}()
}
func() {
if (typ == "" || typ == "rtsps_sessions") && (!anyFilterActive || rtspsSessionFilter != "") {
var data *defs.APIRTSPSessionList
data, err = m.rtspsServer.APISessionsList()
data, err := m.rtspsServer.APISessionsList()
if err == nil && len(data.Items) != 0 {
for _, i := range data.Items {
ta := tags(map[string]string{
"id": i.ID.String(),
"state": string(i.State),
"path": i.Path,
"remoteAddr": i.RemoteAddr,
})
out += metric("rtsps_sessions", ta, 1)
out += metric("rtsps_sessions_bytes_received", ta, int64(i.BytesReceived))
out += metric("rtsps_sessions_bytes_sent", ta, int64(i.BytesSent))
out += metric("rtsps_sessions_rtp_packets_received", ta, int64(i.RTPPacketsReceived))
out += metric("rtsps_sessions_rtp_packets_sent", ta, int64(i.RTPPacketsSent))
out += metric("rtsps_sessions_rtp_packets_lost", ta, int64(i.RTPPacketsLost))
out += metric("rtsps_sessions_rtp_packets_in_error", ta, int64(i.RTPPacketsInError))
out += metricFloat("rtsps_sessions_rtp_packets_jitter", ta, i.RTPPacketsJitter)
out += metric("rtsps_sessions_rtcp_packets_received", ta, int64(i.RTCPPacketsReceived))
out += metric("rtsps_sessions_rtcp_packets_sent", ta, int64(i.RTCPPacketsSent))
out += metric("rtsps_sessions_rtcp_packets_in_error", ta, int64(i.RTCPPacketsInError))
if rtspsSessionFilter == "" || rtspsSessionFilter == i.ID.String() {
ta := tags(map[string]string{
"id": i.ID.String(),
"state": string(i.State),
"path": i.Path,
"remoteAddr": i.RemoteAddr,
})
out += metric("rtsps_sessions", ta, 1)
out += metric("rtsps_sessions_bytes_received", ta, int64(i.BytesReceived))
out += metric("rtsps_sessions_bytes_sent", ta, int64(i.BytesSent))
out += metric("rtsps_sessions_rtp_packets_received", ta, int64(i.RTPPacketsReceived))
out += metric("rtsps_sessions_rtp_packets_sent", ta, int64(i.RTPPacketsSent))
out += metric("rtsps_sessions_rtp_packets_lost", ta, int64(i.RTPPacketsLost))
out += metric("rtsps_sessions_rtp_packets_in_error", ta, int64(i.RTPPacketsInError))
out += metricFloat("rtsps_sessions_rtp_packets_jitter", ta, i.RTPPacketsJitter)
out += metric("rtsps_sessions_rtcp_packets_received", ta, int64(i.RTCPPacketsReceived))
out += metric("rtsps_sessions_rtcp_packets_sent", ta, int64(i.RTCPPacketsSent))
out += metric("rtsps_sessions_rtcp_packets_in_error", ta, int64(i.RTCPPacketsInError))
}
}
} else {
} else if rtspsSessionFilter == "" {
out += metric("rtsps_sessions", "", 0)
out += metric("rtsps_sessions_bytes_received", "", 0)
out += metric("rtsps_sessions_bytes_sent", "", 0)
@@ -332,118 +374,130 @@ func (m *Metrics) onMetrics(ctx *gin.Context) {
out += metric("rtsps_sessions_rtcp_packets_sent", "", 0)
out += metric("rtsps_sessions_rtcp_packets_in_error", "", 0)
}
}()
}
}
if !interfaceIsEmpty(m.rtmpServer) {
if !interfaceIsEmpty(m.rtmpServer) && //nolint:dupl
(typ == "" || typ == "rtmp_conns") &&
(!anyFilterActive || rtmpConnFilter != "") {
var data *defs.APIRTMPConnList
data, err = m.rtmpServer.APIConnsList()
data, err := m.rtmpServer.APIConnsList()
if err == nil && len(data.Items) != 0 {
for _, i := range data.Items {
ta := tags(map[string]string{
"id": i.ID.String(),
"state": string(i.State),
"path": i.Path,
"remoteAddr": i.RemoteAddr,
})
out += metric("rtmp_conns", ta, 1)
out += metric("rtmp_conns_bytes_received", ta, int64(i.BytesReceived))
out += metric("rtmp_conns_bytes_sent", ta, int64(i.BytesSent))
if rtmpConnFilter == "" || rtmpConnFilter == i.ID.String() {
ta := tags(map[string]string{
"id": i.ID.String(),
"state": string(i.State),
"path": i.Path,
"remoteAddr": i.RemoteAddr,
})
out += metric("rtmp_conns", ta, 1)
out += metric("rtmp_conns_bytes_received", ta, int64(i.BytesReceived))
out += metric("rtmp_conns_bytes_sent", ta, int64(i.BytesSent))
}
}
} else {
} else if rtmpConnFilter == "" {
out += metric("rtmp_conns", "", 0)
out += metric("rtmp_conns_bytes_received", "", 0)
out += metric("rtmp_conns_bytes_sent", "", 0)
}
}
if !interfaceIsEmpty(m.rtmpsServer) {
if !interfaceIsEmpty(m.rtmpsServer) && //nolint:dupl
(typ == "" || typ == "rtmp_conns") &&
(!anyFilterActive || rtmpsConnFilter != "") {
var data *defs.APIRTMPConnList
data, err = m.rtmpsServer.APIConnsList()
data, err := m.rtmpsServer.APIConnsList()
if err == nil && len(data.Items) != 0 {
for _, i := range data.Items {
ta := tags(map[string]string{
"id": i.ID.String(),
"state": string(i.State),
"path": i.Path,
"remoteAddr": i.RemoteAddr,
})
out += metric("rtmps_conns", ta, 1)
out += metric("rtmps_conns_bytes_received", ta, int64(i.BytesReceived))
out += metric("rtmps_conns_bytes_sent", ta, int64(i.BytesSent))
if rtmpsConnFilter == "" || rtmpsConnFilter == i.ID.String() {
ta := tags(map[string]string{
"id": i.ID.String(),
"state": string(i.State),
"path": i.Path,
"remoteAddr": i.RemoteAddr,
})
out += metric("rtmps_conns", ta, 1)
out += metric("rtmps_conns_bytes_received", ta, int64(i.BytesReceived))
out += metric("rtmps_conns_bytes_sent", ta, int64(i.BytesSent))
}
}
} else {
} else if rtmpsConnFilter == "" {
out += metric("rtmps_conns", "", 0)
out += metric("rtmps_conns_bytes_received", "", 0)
out += metric("rtmps_conns_bytes_sent", "", 0)
}
}
if !interfaceIsEmpty(m.srtServer) {
if !interfaceIsEmpty(m.srtServer) &&
(typ == "" || typ == "srt_conns") &&
(!anyFilterActive || srtConnFilter != "") {
var data *defs.APISRTConnList
data, err = m.srtServer.APIConnsList()
data, err := m.srtServer.APIConnsList()
if err == nil && len(data.Items) != 0 {
for _, i := range data.Items {
ta := tags(map[string]string{
"id": i.ID.String(),
"state": string(i.State),
"path": i.Path,
"remoteAddr": i.RemoteAddr,
})
out += metric("srt_conns", ta, 1)
out += metric("srt_conns_packets_sent", ta, int64(i.PacketsSent))
out += metric("srt_conns_packets_received", ta, int64(i.PacketsReceived))
out += metric("srt_conns_packets_sent_unique", ta, int64(i.PacketsSentUnique))
out += metric("srt_conns_packets_received_unique", ta, int64(i.PacketsReceivedUnique))
out += metric("srt_conns_packets_send_loss", ta, int64(i.PacketsSendLoss))
out += metric("srt_conns_packets_received_loss", ta, int64(i.PacketsReceivedLoss))
out += metric("srt_conns_packets_retrans", ta, int64(i.PacketsRetrans))
out += metric("srt_conns_packets_received_retrans", ta, int64(i.PacketsReceivedRetrans))
out += metric("srt_conns_packets_sent_ack", ta, int64(i.PacketsSentACK))
out += metric("srt_conns_packets_received_ack", ta, int64(i.PacketsReceivedACK))
out += metric("srt_conns_packets_sent_nak", ta, int64(i.PacketsSentNAK))
out += metric("srt_conns_packets_received_nak", ta, int64(i.PacketsReceivedNAK))
out += metric("srt_conns_packets_sent_km", ta, int64(i.PacketsSentKM))
out += metric("srt_conns_packets_received_km", ta, int64(i.PacketsReceivedKM))
out += metric("srt_conns_us_snd_duration", ta, int64(i.UsSndDuration))
out += metric("srt_conns_packets_send_drop", ta, int64(i.PacketsSendDrop))
out += metric("srt_conns_packets_received_drop", ta, int64(i.PacketsReceivedDrop))
out += metric("srt_conns_packets_received_undecrypt", ta, int64(i.PacketsReceivedUndecrypt))
out += metric("srt_conns_bytes_sent", ta, int64(i.BytesSent))
out += metric("srt_conns_bytes_received", ta, int64(i.BytesReceived))
out += metric("srt_conns_bytes_sent_unique", ta, int64(i.BytesSentUnique))
out += metric("srt_conns_bytes_received_unique", ta, int64(i.BytesReceivedUnique))
out += metric("srt_conns_bytes_received_loss", ta, int64(i.BytesReceivedLoss))
out += metric("srt_conns_bytes_retrans", ta, int64(i.BytesRetrans))
out += metric("srt_conns_bytes_received_retrans", ta, int64(i.BytesReceivedRetrans))
out += metric("srt_conns_bytes_send_drop", ta, int64(i.BytesSendDrop))
out += metric("srt_conns_bytes_received_drop", ta, int64(i.BytesReceivedDrop))
out += metric("srt_conns_bytes_received_undecrypt", ta, int64(i.BytesReceivedUndecrypt))
out += metricFloat("srt_conns_us_packets_send_period", ta, i.UsPacketsSendPeriod)
out += metric("srt_conns_packets_flow_window", ta, int64(i.PacketsFlowWindow))
out += metric("srt_conns_packets_flight_size", ta, int64(i.PacketsFlightSize))
out += metricFloat("srt_conns_ms_rtt", ta, i.MsRTT)
out += metricFloat("srt_conns_mbps_send_rate", ta, i.MbpsSendRate)
out += metricFloat("srt_conns_mbps_receive_rate", ta, i.MbpsReceiveRate)
out += metricFloat("srt_conns_mbps_link_capacity", ta, i.MbpsLinkCapacity)
out += metric("srt_conns_bytes_avail_send_buf", ta, int64(i.BytesAvailSendBuf))
out += metric("srt_conns_bytes_avail_receive_buf", ta, int64(i.BytesAvailReceiveBuf))
out += metricFloat("srt_conns_mbps_max_bw", ta, i.MbpsMaxBW)
out += metric("srt_conns_bytes_mss", ta, int64(i.ByteMSS))
out += metric("srt_conns_packets_send_buf", ta, int64(i.PacketsSendBuf))
out += metric("srt_conns_bytes_send_buf", ta, int64(i.BytesSendBuf))
out += metric("srt_conns_ms_send_buf", ta, int64(i.MsSendBuf))
out += metric("srt_conns_ms_send_tsb_pd_delay", ta, int64(i.MsSendTsbPdDelay))
out += metric("srt_conns_packets_receive_buf", ta, int64(i.PacketsReceiveBuf))
out += metric("srt_conns_bytes_receive_buf", ta, int64(i.BytesReceiveBuf))
out += metric("srt_conns_ms_receive_buf", ta, int64(i.MsReceiveBuf))
out += metric("srt_conns_ms_receive_tsb_pd_delay", ta, int64(i.MsReceiveTsbPdDelay))
out += metric("srt_conns_packets_reorder_tolerance", ta, int64(i.PacketsReorderTolerance))
out += metric("srt_conns_packets_received_avg_belated_time", ta, int64(i.PacketsReceivedAvgBelatedTime))
out += metricFloat("srt_conns_packets_send_loss_rate", ta, i.PacketsSendLossRate)
out += metricFloat("srt_conns_packets_received_loss_rate", ta, i.PacketsReceivedLossRate)
if srtConnFilter == "" || srtConnFilter == i.ID.String() {
ta := tags(map[string]string{
"id": i.ID.String(),
"state": string(i.State),
"path": i.Path,
"remoteAddr": i.RemoteAddr,
})
out += metric("srt_conns", ta, 1)
out += metric("srt_conns_packets_sent", ta, int64(i.PacketsSent))
out += metric("srt_conns_packets_received", ta, int64(i.PacketsReceived))
out += metric("srt_conns_packets_sent_unique", ta, int64(i.PacketsSentUnique))
out += metric("srt_conns_packets_received_unique", ta, int64(i.PacketsReceivedUnique))
out += metric("srt_conns_packets_send_loss", ta, int64(i.PacketsSendLoss))
out += metric("srt_conns_packets_received_loss", ta, int64(i.PacketsReceivedLoss))
out += metric("srt_conns_packets_retrans", ta, int64(i.PacketsRetrans))
out += metric("srt_conns_packets_received_retrans", ta, int64(i.PacketsReceivedRetrans))
out += metric("srt_conns_packets_sent_ack", ta, int64(i.PacketsSentACK))
out += metric("srt_conns_packets_received_ack", ta, int64(i.PacketsReceivedACK))
out += metric("srt_conns_packets_sent_nak", ta, int64(i.PacketsSentNAK))
out += metric("srt_conns_packets_received_nak", ta, int64(i.PacketsReceivedNAK))
out += metric("srt_conns_packets_sent_km", ta, int64(i.PacketsSentKM))
out += metric("srt_conns_packets_received_km", ta, int64(i.PacketsReceivedKM))
out += metric("srt_conns_us_snd_duration", ta, int64(i.UsSndDuration))
out += metric("srt_conns_packets_send_drop", ta, int64(i.PacketsSendDrop))
out += metric("srt_conns_packets_received_drop", ta, int64(i.PacketsReceivedDrop))
out += metric("srt_conns_packets_received_undecrypt", ta, int64(i.PacketsReceivedUndecrypt))
out += metric("srt_conns_bytes_sent", ta, int64(i.BytesSent))
out += metric("srt_conns_bytes_received", ta, int64(i.BytesReceived))
out += metric("srt_conns_bytes_sent_unique", ta, int64(i.BytesSentUnique))
out += metric("srt_conns_bytes_received_unique", ta, int64(i.BytesReceivedUnique))
out += metric("srt_conns_bytes_received_loss", ta, int64(i.BytesReceivedLoss))
out += metric("srt_conns_bytes_retrans", ta, int64(i.BytesRetrans))
out += metric("srt_conns_bytes_received_retrans", ta, int64(i.BytesReceivedRetrans))
out += metric("srt_conns_bytes_send_drop", ta, int64(i.BytesSendDrop))
out += metric("srt_conns_bytes_received_drop", ta, int64(i.BytesReceivedDrop))
out += metric("srt_conns_bytes_received_undecrypt", ta, int64(i.BytesReceivedUndecrypt))
out += metricFloat("srt_conns_us_packets_send_period", ta, i.UsPacketsSendPeriod)
out += metric("srt_conns_packets_flow_window", ta, int64(i.PacketsFlowWindow))
out += metric("srt_conns_packets_flight_size", ta, int64(i.PacketsFlightSize))
out += metricFloat("srt_conns_ms_rtt", ta, i.MsRTT)
out += metricFloat("srt_conns_mbps_send_rate", ta, i.MbpsSendRate)
out += metricFloat("srt_conns_mbps_receive_rate", ta, i.MbpsReceiveRate)
out += metricFloat("srt_conns_mbps_link_capacity", ta, i.MbpsLinkCapacity)
out += metric("srt_conns_bytes_avail_send_buf", ta, int64(i.BytesAvailSendBuf))
out += metric("srt_conns_bytes_avail_receive_buf", ta, int64(i.BytesAvailReceiveBuf))
out += metricFloat("srt_conns_mbps_max_bw", ta, i.MbpsMaxBW)
out += metric("srt_conns_bytes_mss", ta, int64(i.ByteMSS))
out += metric("srt_conns_packets_send_buf", ta, int64(i.PacketsSendBuf))
out += metric("srt_conns_bytes_send_buf", ta, int64(i.BytesSendBuf))
out += metric("srt_conns_ms_send_buf", ta, int64(i.MsSendBuf))
out += metric("srt_conns_ms_send_tsb_pd_delay", ta, int64(i.MsSendTsbPdDelay))
out += metric("srt_conns_packets_receive_buf", ta, int64(i.PacketsReceiveBuf))
out += metric("srt_conns_bytes_receive_buf", ta, int64(i.BytesReceiveBuf))
out += metric("srt_conns_ms_receive_buf", ta, int64(i.MsReceiveBuf))
out += metric("srt_conns_ms_receive_tsb_pd_delay", ta, int64(i.MsReceiveTsbPdDelay))
out += metric("srt_conns_packets_reorder_tolerance", ta, int64(i.PacketsReorderTolerance))
out += metric("srt_conns_packets_received_avg_belated_time", ta, int64(i.PacketsReceivedAvgBelatedTime))
out += metricFloat("srt_conns_packets_send_loss_rate", ta, i.PacketsSendLossRate)
out += metricFloat("srt_conns_packets_received_loss_rate", ta, i.PacketsReceivedLossRate)
}
}
} else {
} else if srtConnFilter == "" {
out += metric("srt_conns", "", 0)
out += metric("srt_conns_packets_sent", "", 0)
out += metric("srt_conns_packets_received", "", 0)
@@ -499,28 +553,32 @@ func (m *Metrics) onMetrics(ctx *gin.Context) {
}
}
if !interfaceIsEmpty(m.webRTCServer) {
if !interfaceIsEmpty(m.webRTCServer) &&
(typ == "" || typ == "webrtc_sessions") &&
(!anyFilterActive || webrtcSessionFilter != "") {
var data *defs.APIWebRTCSessionList
data, err = m.webRTCServer.APISessionsList()
data, err := m.webRTCServer.APISessionsList()
if err == nil && len(data.Items) != 0 {
for _, i := range data.Items {
ta := tags(map[string]string{
"id": i.ID.String(),
"state": string(i.State),
"path": i.Path,
"remoteAddr": i.RemoteAddr,
})
out += metric("webrtc_sessions", ta, 1)
out += metric("webrtc_sessions_bytes_received", ta, int64(i.BytesReceived))
out += metric("webrtc_sessions_bytes_sent", ta, int64(i.BytesSent))
out += metric("webrtc_sessions_rtp_packets_received", ta, int64(i.RTPPacketsReceived))
out += metric("webrtc_sessions_rtp_packets_sent", ta, int64(i.RTPPacketsSent))
out += metric("webrtc_sessions_rtp_packets_lost", ta, int64(i.RTPPacketsLost))
out += metricFloat("webrtc_sessions_rtp_packets_jitter", ta, i.RTPPacketsJitter)
out += metric("webrtc_sessions_rtcp_packets_received", ta, int64(i.RTCPPacketsReceived))
out += metric("webrtc_sessions_rtcp_packets_sent", ta, int64(i.RTCPPacketsSent))
if webrtcSessionFilter == "" || webrtcSessionFilter == i.ID.String() {
ta := tags(map[string]string{
"id": i.ID.String(),
"state": string(i.State),
"path": i.Path,
"remoteAddr": i.RemoteAddr,
})
out += metric("webrtc_sessions", ta, 1)
out += metric("webrtc_sessions_bytes_received", ta, int64(i.BytesReceived))
out += metric("webrtc_sessions_bytes_sent", ta, int64(i.BytesSent))
out += metric("webrtc_sessions_rtp_packets_received", ta, int64(i.RTPPacketsReceived))
out += metric("webrtc_sessions_rtp_packets_sent", ta, int64(i.RTPPacketsSent))
out += metric("webrtc_sessions_rtp_packets_lost", ta, int64(i.RTPPacketsLost))
out += metricFloat("webrtc_sessions_rtp_packets_jitter", ta, i.RTPPacketsJitter)
out += metric("webrtc_sessions_rtcp_packets_received", ta, int64(i.RTCPPacketsReceived))
out += metric("webrtc_sessions_rtcp_packets_sent", ta, int64(i.RTCPPacketsSent))
}
}
} else {
} else if webrtcSessionFilter == "" {
out += metric("webrtc_sessions", "", 0)
out += metric("webrtc_sessions_bytes_received", "", 0)
out += metric("webrtc_sessions_bytes_sent", "", 0)

View File

@@ -68,6 +68,88 @@ func (dummyHLSServer) APIMuxersGet(string) (*defs.APIHLSMuxer, error) {
panic("unused")
}
type dummyRTSPServer struct{}
func (dummyRTSPServer) APIConnsList() (*defs.APIRTSPConnsList, error) {
return &defs.APIRTSPConnsList{
ItemCount: 1,
PageCount: 1,
Items: []*defs.APIRTSPConn{{
ID: uuid.MustParse("18294761-f9d1-4ea9-9a35-fe265b62eb41"),
Created: time.Date(2003, 11, 4, 23, 15, 7, 0, time.UTC),
RemoteAddr: "124.5.5.5:34542",
BytesReceived: 123,
BytesSent: 456,
Session: nil,
}},
}, nil
}
func (dummyRTSPServer) APIConnsGet(uuid.UUID) (*defs.APIRTSPConn, error) {
panic("unused")
}
func (dummyRTSPServer) APISessionsList() (*defs.APIRTSPSessionList, error) {
return &defs.APIRTSPSessionList{
ItemCount: 1,
PageCount: 1,
Items: []*defs.APIRTSPSession{{
ID: uuid.MustParse("124b22ce-9c34-4387-b045-44caf98049f7"),
Created: time.Date(2003, 11, 4, 23, 15, 7, 0, time.UTC),
RemoteAddr: "124.5.5.5:34542",
State: defs.APIRTSPSessionStatePublish,
Path: "mypath",
Query: "myquery",
Transport: nil,
BytesReceived: 123,
BytesSent: 456,
RTPPacketsReceived: 789,
RTPPacketsSent: 123,
RTPPacketsLost: 456,
RTPPacketsInError: 789,
RTPPacketsJitter: 123,
RTCPPacketsReceived: 456,
RTCPPacketsSent: 789,
RTCPPacketsInError: 456,
}},
}, nil
}
func (dummyRTSPServer) APISessionsGet(uuid.UUID) (*defs.APIRTSPSession, error) {
panic("unused")
}
func (dummyRTSPServer) APISessionsKick(uuid.UUID) error {
panic("unused")
}
type dummyRTMPServer struct{}
func (dummyRTMPServer) APIConnsList() (*defs.APIRTMPConnList, error) {
return &defs.APIRTMPConnList{
ItemCount: 1,
PageCount: 1,
Items: []*defs.APIRTMPConn{{
ID: uuid.MustParse("9a07afe4-fc07-4c9b-be6e-6255720c36d0"),
Created: time.Date(2003, 11, 4, 23, 15, 7, 0, time.UTC),
RemoteAddr: "3.3.3.3:5678",
State: defs.APIRTMPConnStateRead,
Path: "mypath",
Query: "myquery",
BytesReceived: 123,
BytesSent: 456,
}},
}, nil
}
func (dummyRTMPServer) APIConnsGet(uuid.UUID) (*defs.APIRTMPConn, error) {
panic("unused")
}
func (dummyRTMPServer) APIConnsKick(uuid.UUID) error {
panic("unused")
}
type dummyWebRTCServer struct{}
func (dummyWebRTCServer) APISessionsList() (*defs.APIWebRTCSessionList, error) {
@@ -155,6 +237,10 @@ func TestMetrics(t *testing.T) {
m.SetPathManager(&dummyPathManager{})
m.SetHLSServer(&dummyHLSServer{})
m.SetRTSPServer(&dummyRTSPServer{})
m.SetRTSPSServer(&dummyRTSPServer{})
m.SetRTMPServer(&dummyRTMPServer{})
m.SetRTMPSServer(&dummyRTMPServer{})
m.SetWebRTCServer(&dummyWebRTCServer{})
tr := &http.Transport{}
@@ -175,6 +261,68 @@ func TestMetrics(t *testing.T) {
`paths_readers{name="mypath",state="ready"} 1`+"\n"+
`hls_muxers{name="mypath"} 1`+"\n"+
`hls_muxers_bytes_sent{name="mypath"} 789`+"\n"+
`rtsp_conns{id="18294761-f9d1-4ea9-9a35-fe265b62eb41"} 1`+"\n"+
`rtsp_conns_bytes_received{id="18294761-f9d1-4ea9-9a35-fe265b62eb41"} 123`+"\n"+
`rtsp_conns_bytes_sent{id="18294761-f9d1-4ea9-9a35-fe265b62eb41"} 456`+"\n"+
`rtsp_sessions{id="124b22ce-9c34-4387-b045-44caf98049f7",`+
`path="mypath",remoteAddr="124.5.5.5:34542",state="publish"} 1`+"\n"+
`rtsp_sessions_bytes_received{id="124b22ce-9c34-4387-b045-44caf98049f7",`+
`path="mypath",remoteAddr="124.5.5.5:34542",state="publish"} 123`+"\n"+
`rtsp_sessions_bytes_sent{id="124b22ce-9c34-4387-b045-44caf98049f7",`+
`path="mypath",remoteAddr="124.5.5.5:34542",state="publish"} 456`+"\n"+
`rtsp_sessions_rtp_packets_received{id="124b22ce-9c34-4387-b045-44caf98049f7",`+
`path="mypath",remoteAddr="124.5.5.5:34542",state="publish"} 789`+"\n"+
`rtsp_sessions_rtp_packets_sent{id="124b22ce-9c34-4387-b045-44caf98049f7",`+
`path="mypath",remoteAddr="124.5.5.5:34542",state="publish"} 123`+"\n"+
`rtsp_sessions_rtp_packets_lost{id="124b22ce-9c34-4387-b045-44caf98049f7",`+
`path="mypath",remoteAddr="124.5.5.5:34542",state="publish"} 456`+"\n"+
`rtsp_sessions_rtp_packets_in_error{id="124b22ce-9c34-4387-b045-44caf98049f7",`+
`path="mypath",remoteAddr="124.5.5.5:34542",state="publish"} 789`+"\n"+
`rtsp_sessions_rtp_packets_jitter{id="124b22ce-9c34-4387-b045-44caf98049f7",`+
`path="mypath",remoteAddr="124.5.5.5:34542",state="publish"} 123`+"\n"+
`rtsp_sessions_rtcp_packets_received{id="124b22ce-9c34-4387-b045-44caf98049f7",`+
`path="mypath",remoteAddr="124.5.5.5:34542",state="publish"} 456`+"\n"+
`rtsp_sessions_rtcp_packets_sent{id="124b22ce-9c34-4387-b045-44caf98049f7",`+
`path="mypath",remoteAddr="124.5.5.5:34542",state="publish"} 789`+"\n"+
`rtsp_sessions_rtcp_packets_in_error{id="124b22ce-9c34-4387-b045-44caf98049f7",`+
`path="mypath",remoteAddr="124.5.5.5:34542",state="publish"} 456`+"\n"+
`rtsps_conns{id="18294761-f9d1-4ea9-9a35-fe265b62eb41"} 1`+"\n"+
`rtsps_conns_bytes_received{id="18294761-f9d1-4ea9-9a35-fe265b62eb41"} 123`+"\n"+
`rtsps_conns_bytes_sent{id="18294761-f9d1-4ea9-9a35-fe265b62eb41"} 456`+"\n"+
`rtsps_sessions{id="124b22ce-9c34-4387-b045-44caf98049f7",`+
`path="mypath",remoteAddr="124.5.5.5:34542",state="publish"} 1`+"\n"+
`rtsps_sessions_bytes_received{id="124b22ce-9c34-4387-b045-44caf98049f7",`+
`path="mypath",remoteAddr="124.5.5.5:34542",state="publish"} 123`+"\n"+
`rtsps_sessions_bytes_sent{id="124b22ce-9c34-4387-b045-44caf98049f7",`+
`path="mypath",remoteAddr="124.5.5.5:34542",state="publish"} 456`+"\n"+
`rtsps_sessions_rtp_packets_received{id="124b22ce-9c34-4387-b045-44caf98049f7",`+
`path="mypath",remoteAddr="124.5.5.5:34542",state="publish"} 789`+"\n"+
`rtsps_sessions_rtp_packets_sent{id="124b22ce-9c34-4387-b045-44caf98049f7",`+
`path="mypath",remoteAddr="124.5.5.5:34542",state="publish"} 123`+"\n"+
`rtsps_sessions_rtp_packets_lost{id="124b22ce-9c34-4387-b045-44caf98049f7",`+
`path="mypath",remoteAddr="124.5.5.5:34542",state="publish"} 456`+"\n"+
`rtsps_sessions_rtp_packets_in_error{id="124b22ce-9c34-4387-b045-44caf98049f7",`+
`path="mypath",remoteAddr="124.5.5.5:34542",state="publish"} 789`+"\n"+
`rtsps_sessions_rtp_packets_jitter{id="124b22ce-9c34-4387-b045-44caf98049f7",`+
`path="mypath",remoteAddr="124.5.5.5:34542",state="publish"} 123`+"\n"+
`rtsps_sessions_rtcp_packets_received{id="124b22ce-9c34-4387-b045-44caf98049f7",`+
`path="mypath",remoteAddr="124.5.5.5:34542",state="publish"} 456`+"\n"+
`rtsps_sessions_rtcp_packets_sent{id="124b22ce-9c34-4387-b045-44caf98049f7",`+
`path="mypath",remoteAddr="124.5.5.5:34542",state="publish"} 789`+"\n"+
`rtsps_sessions_rtcp_packets_in_error{id="124b22ce-9c34-4387-b045-44caf98049f7",`+
`path="mypath",remoteAddr="124.5.5.5:34542",state="publish"} 456`+"\n"+
`rtmp_conns{id="9a07afe4-fc07-4c9b-be6e-6255720c36d0",`+
`path="mypath",remoteAddr="3.3.3.3:5678",state="read"} 1`+"\n"+
`rtmp_conns_bytes_received{id="9a07afe4-fc07-4c9b-be6e-6255720c36d0",`+
`path="mypath",remoteAddr="3.3.3.3:5678",state="read"} 123`+"\n"+
`rtmp_conns_bytes_sent{id="9a07afe4-fc07-4c9b-be6e-6255720c36d0",`+
`path="mypath",remoteAddr="3.3.3.3:5678",state="read"} 456`+"\n"+
`rtmps_conns{id="9a07afe4-fc07-4c9b-be6e-6255720c36d0",`+
`path="mypath",remoteAddr="3.3.3.3:5678",state="read"} 1`+"\n"+
`rtmps_conns_bytes_received{id="9a07afe4-fc07-4c9b-be6e-6255720c36d0",`+
`path="mypath",remoteAddr="3.3.3.3:5678",state="read"} 123`+"\n"+
`rtmps_conns_bytes_sent{id="9a07afe4-fc07-4c9b-be6e-6255720c36d0",`+
`path="mypath",remoteAddr="3.3.3.3:5678",state="read"} 456`+"\n"+
`webrtc_sessions{id="f47ac10b-58cc-4372-a567-0e02b2c3d479",`+
`path="mypath",remoteAddr="127.0.0.1:3455",state="read"} 1`+"\n"+
`webrtc_sessions_bytes_received{id="f47ac10b-58cc-4372-a567-0e02b2c3d479",`+
@@ -195,3 +343,109 @@ func TestMetrics(t *testing.T) {
`path="mypath",remoteAddr="127.0.0.1:3455",state="read"} 456`+"\n",
string(byts))
}
func TestMetricsFilter(t *testing.T) {
for _, ca := range []string{
"path",
"hls_muxer",
"rtsp_conn",
"rtsp_session",
// "rtsps_conn",
// "rtsps_session",
// "rtmp_conn",
// "rtmps_conn",
// "srt_conn",
// "webrtc_session",
} {
t.Run(ca, func(t *testing.T) {
m := Metrics{
Address: "localhost:9998",
AllowOrigin: "*",
ReadTimeout: conf.Duration(10 * time.Second),
AuthManager: test.NilAuthManager,
Parent: test.NilLogger,
}
err := m.Initialize()
require.NoError(t, err)
defer m.Close()
m.SetPathManager(&dummyPathManager{})
m.SetHLSServer(&dummyHLSServer{})
m.SetRTSPServer(&dummyRTSPServer{})
m.SetWebRTCServer(&dummyWebRTCServer{})
tr := &http.Transport{}
defer tr.CloseIdleConnections()
hc := &http.Client{Transport: tr}
u := "http://localhost:9998/metrics"
switch ca {
case "path":
u += "?path=mypath"
case "hls_muxer":
u += "?hls_muxer=mypath"
case "rtsp_conn":
u += "?rtsp_conn=18294761-f9d1-4ea9-9a35-fe265b62eb41"
case "rtsp_session":
u += "?rtsp_session=124b22ce-9c34-4387-b045-44caf98049f7"
}
res, err := hc.Get(u)
require.NoError(t, err)
defer res.Body.Close()
byts, err := io.ReadAll(res.Body)
require.NoError(t, err)
switch ca {
case "path":
require.Equal(t,
`paths{name="mypath",state="ready"} 1`+"\n"+
`paths_bytes_received{name="mypath",state="ready"} 123`+"\n"+
`paths_bytes_sent{name="mypath",state="ready"} 456`+"\n"+
`paths_readers{name="mypath",state="ready"} 1`+"\n",
string(byts))
case "hls_muxer":
require.Equal(t,
`hls_muxers{name="mypath"} 1`+"\n"+
`hls_muxers_bytes_sent{name="mypath"} 789`+"\n",
string(byts))
case "rtsp_conn":
require.Equal(t,
`rtsp_conns{id="18294761-f9d1-4ea9-9a35-fe265b62eb41"} 1`+"\n"+
`rtsp_conns_bytes_received{id="18294761-f9d1-4ea9-9a35-fe265b62eb41"} 123`+"\n"+
`rtsp_conns_bytes_sent{id="18294761-f9d1-4ea9-9a35-fe265b62eb41"} 456`+"\n",
string(byts))
case "rtsp_session":
require.Equal(t,
`rtsp_sessions{id="124b22ce-9c34-4387-b045-44caf98049f7",`+
`path="mypath",remoteAddr="124.5.5.5:34542",state="publish"} 1`+"\n"+
`rtsp_sessions_bytes_received{id="124b22ce-9c34-4387-b045-44caf98049f7",`+
`path="mypath",remoteAddr="124.5.5.5:34542",state="publish"} 123`+"\n"+
`rtsp_sessions_bytes_sent{id="124b22ce-9c34-4387-b045-44caf98049f7",`+
`path="mypath",remoteAddr="124.5.5.5:34542",state="publish"} 456`+"\n"+
`rtsp_sessions_rtp_packets_received{id="124b22ce-9c34-4387-b045-44caf98049f7",`+
`path="mypath",remoteAddr="124.5.5.5:34542",state="publish"} 789`+"\n"+
`rtsp_sessions_rtp_packets_sent{id="124b22ce-9c34-4387-b045-44caf98049f7",`+
`path="mypath",remoteAddr="124.5.5.5:34542",state="publish"} 123`+"\n"+
`rtsp_sessions_rtp_packets_lost{id="124b22ce-9c34-4387-b045-44caf98049f7",`+
`path="mypath",remoteAddr="124.5.5.5:34542",state="publish"} 456`+"\n"+
`rtsp_sessions_rtp_packets_in_error{id="124b22ce-9c34-4387-b045-44caf98049f7",`+
`path="mypath",remoteAddr="124.5.5.5:34542",state="publish"} 789`+"\n"+
`rtsp_sessions_rtp_packets_jitter{id="124b22ce-9c34-4387-b045-44caf98049f7",`+
`path="mypath",remoteAddr="124.5.5.5:34542",state="publish"} 123`+"\n"+
`rtsp_sessions_rtcp_packets_received{id="124b22ce-9c34-4387-b045-44caf98049f7",`+
`path="mypath",remoteAddr="124.5.5.5:34542",state="publish"} 456`+"\n"+
`rtsp_sessions_rtcp_packets_sent{id="124b22ce-9c34-4387-b045-44caf98049f7",`+
`path="mypath",remoteAddr="124.5.5.5:34542",state="publish"} 789`+"\n"+
`rtsp_sessions_rtcp_packets_in_error{id="124b22ce-9c34-4387-b045-44caf98049f7",`+
`path="mypath",remoteAddr="124.5.5.5:34542",state="publish"} 456`+"\n",
string(byts))
}
})
}
}