api, metrics: add endpoints and metrics for RTSP connections (#1233)

new API endpoints:

* /v1/rtspconns/list
* /v1/rtspsconns/list

new metrics:

* rtsp_conns
* rtsps_conns
This commit is contained in:
Alessandro Ros
2022-11-09 18:31:31 +01:00
committed by GitHub
parent 16580c8985
commit 4ac175d3cc
12 changed files with 291 additions and 109 deletions

View File

@@ -443,6 +443,7 @@ Obtaining:
```
paths{name="<path_name>",state="ready"} 1
rtsp_conns 1
rtsp_sessions{state="idle"} 0
rtsp_sessions{state="read"} 0
rtsp_sessions{state="publish"} 1

View File

@@ -375,20 +375,19 @@ components:
type: string
enum: [hlsMuxer]
RTSPSession:
RTSPConn:
type: object
properties:
created:
type: string
remoteAddr:
type: string
state:
type: string
enum: [idle, read, publish]
RTSPSSession:
RTSPSession:
type: object
properties:
created:
type: string
remoteAddr:
type: string
state:
@@ -433,6 +432,22 @@ components:
additionalProperties:
$ref: '#/components/schemas/Path'
RTSPConnsList:
type: object
properties:
items:
type: object
additionalProperties:
$ref: '#/components/schemas/RTSPConn'
RTSPSConnsList:
type: object
properties:
items:
type: object
additionalProperties:
$ref: '#/components/schemas/RTSPConn'
RTSPSessionsList:
type: object
properties:
@@ -441,14 +456,6 @@ components:
additionalProperties:
$ref: '#/components/schemas/RTSPSession'
RTSPSSessionsList:
type: object
properties:
items:
type: object
additionalProperties:
$ref: '#/components/schemas/RTSPSSession'
RTMPConnsList:
type: object
properties:
@@ -599,6 +606,23 @@ paths:
'500':
description: internal server error.
/v1/rtspconns/list:
get:
operationId: rtspConnsList
summary: returns all active RTSP connections.
description: ''
responses:
'200':
description: the request was successful.
content:
application/json:
schema:
$ref: '#/components/schemas/RTSPConnsList'
'400':
description: invalid request.
'500':
description: internal server error.
/v1/rtspsessions/list:
get:
operationId: rtspSessionsList
@@ -616,6 +640,23 @@ paths:
'500':
description: internal server error.
/v1/rtspsconns/list:
get:
operationId: rtspsConnsList
summary: returns all active RTSPS connections.
description: ''
responses:
'200':
description: the request was successful.
content:
application/json:
schema:
$ref: '#/components/schemas/RTSPConnsList'
'400':
description: invalid request.
'500':
description: internal server error.
/v1/rtspsessions/kick/{id}:
post:
operationId: rtspSessionsKick
@@ -647,7 +688,7 @@ paths:
content:
application/json:
schema:
$ref: '#/components/schemas/RTSPSSessionsList'
$ref: '#/components/schemas/RTSPSessionsList'
'400':
description: invalid request.
'500':

View File

@@ -83,21 +83,22 @@ func loadConfPathData(ctx *gin.Context) (interface{}, error) {
}
type apiPathManager interface {
apiPathsList(req pathAPIPathsListReq) pathAPIPathsListRes
apiPathsList() pathAPIPathsListRes
}
type apiRTSPServer interface {
apiSessionsList(req rtspServerAPISessionsListReq) rtspServerAPISessionsListRes
apiSessionsKick(req rtspServerAPISessionsKickReq) rtspServerAPISessionsKickRes
apiConnsList() rtspServerAPIConnsListRes
apiSessionsList() rtspServerAPISessionsListRes
apiSessionsKick(string) rtspServerAPISessionsKickRes
}
type apiRTMPServer interface {
apiConnsList(req rtmpServerAPIConnsListReq) rtmpServerAPIConnsListRes
apiConnsKick(req rtmpServerAPIConnsKickReq) rtmpServerAPIConnsKickRes
apiConnsList() rtmpServerAPIConnsListRes
apiConnsKick(id string) rtmpServerAPIConnsKickRes
}
type apiHLSServer interface {
apiHLSMuxersList(req hlsServerAPIMuxersListReq) hlsServerAPIMuxersListRes
apiHLSMuxersList() hlsServerAPIMuxersListRes
}
type apiParent interface {
@@ -162,11 +163,13 @@ func newAPI(
group.GET("/v1/paths/list", a.onPathsList)
if !interfaceIsEmpty(a.rtspServer) {
group.GET("/v1/rtspconns/list", a.onRTSPConnsList)
group.GET("/v1/rtspsessions/list", a.onRTSPSessionsList)
group.POST("/v1/rtspsessions/kick/:id", a.onRTSPSessionsKick)
}
if !interfaceIsEmpty(a.rtspsServer) {
group.GET("/v1/rtspsconns/list", a.onRTSPSConnsList)
group.GET("/v1/rtspssessions/list", a.onRTSPSSessionsList)
group.POST("/v1/rtspssessions/kick/:id", a.onRTSPSSessionsKick)
}
@@ -382,7 +385,17 @@ func (a *api) onConfigPathsDelete(ctx *gin.Context) {
}
func (a *api) onPathsList(ctx *gin.Context) {
res := a.pathManager.apiPathsList(pathAPIPathsListReq{})
res := a.pathManager.apiPathsList()
if res.err != nil {
ctx.AbortWithStatus(http.StatusInternalServerError)
return
}
ctx.JSON(http.StatusOK, res.data)
}
func (a *api) onRTSPConnsList(ctx *gin.Context) {
res := a.rtspServer.apiConnsList()
if res.err != nil {
ctx.AbortWithStatus(http.StatusInternalServerError)
return
@@ -392,7 +405,7 @@ func (a *api) onPathsList(ctx *gin.Context) {
}
func (a *api) onRTSPSessionsList(ctx *gin.Context) {
res := a.rtspServer.apiSessionsList(rtspServerAPISessionsListReq{})
res := a.rtspServer.apiSessionsList()
if res.err != nil {
ctx.AbortWithStatus(http.StatusInternalServerError)
return
@@ -404,7 +417,7 @@ func (a *api) onRTSPSessionsList(ctx *gin.Context) {
func (a *api) onRTSPSessionsKick(ctx *gin.Context) {
id := ctx.Param("id")
res := a.rtspServer.apiSessionsKick(rtspServerAPISessionsKickReq{id: id})
res := a.rtspServer.apiSessionsKick(id)
if res.err != nil {
ctx.AbortWithStatus(http.StatusNotFound)
return
@@ -413,8 +426,18 @@ func (a *api) onRTSPSessionsKick(ctx *gin.Context) {
ctx.Status(http.StatusOK)
}
func (a *api) onRTSPSConnsList(ctx *gin.Context) {
res := a.rtspsServer.apiConnsList()
if res.err != nil {
ctx.AbortWithStatus(http.StatusInternalServerError)
return
}
ctx.JSON(http.StatusOK, res.data)
}
func (a *api) onRTSPSSessionsList(ctx *gin.Context) {
res := a.rtspsServer.apiSessionsList(rtspServerAPISessionsListReq{})
res := a.rtspsServer.apiSessionsList()
if res.err != nil {
ctx.AbortWithStatus(http.StatusInternalServerError)
return
@@ -426,7 +449,7 @@ func (a *api) onRTSPSSessionsList(ctx *gin.Context) {
func (a *api) onRTSPSSessionsKick(ctx *gin.Context) {
id := ctx.Param("id")
res := a.rtspsServer.apiSessionsKick(rtspServerAPISessionsKickReq{id: id})
res := a.rtspsServer.apiSessionsKick(id)
if res.err != nil {
ctx.AbortWithStatus(http.StatusNotFound)
return
@@ -436,7 +459,7 @@ func (a *api) onRTSPSSessionsKick(ctx *gin.Context) {
}
func (a *api) onRTMPConnsList(ctx *gin.Context) {
res := a.rtmpServer.apiConnsList(rtmpServerAPIConnsListReq{})
res := a.rtmpServer.apiConnsList()
if res.err != nil {
ctx.AbortWithStatus(http.StatusInternalServerError)
return
@@ -448,7 +471,7 @@ func (a *api) onRTMPConnsList(ctx *gin.Context) {
func (a *api) onRTMPConnsKick(ctx *gin.Context) {
id := ctx.Param("id")
res := a.rtmpServer.apiConnsKick(rtmpServerAPIConnsKickReq{id: id})
res := a.rtmpServer.apiConnsKick(id)
if res.err != nil {
ctx.AbortWithStatus(http.StatusNotFound)
return
@@ -458,7 +481,7 @@ func (a *api) onRTMPConnsKick(ctx *gin.Context) {
}
func (a *api) onRTMPSConnsList(ctx *gin.Context) {
res := a.rtmpsServer.apiConnsList(rtmpServerAPIConnsListReq{})
res := a.rtmpsServer.apiConnsList()
if res.err != nil {
ctx.AbortWithStatus(http.StatusInternalServerError)
return
@@ -470,7 +493,7 @@ func (a *api) onRTMPSConnsList(ctx *gin.Context) {
func (a *api) onRTMPSConnsKick(ctx *gin.Context) {
id := ctx.Param("id")
res := a.rtmpsServer.apiConnsKick(rtmpServerAPIConnsKickReq{id: id})
res := a.rtmpsServer.apiConnsKick(id)
if res.err != nil {
ctx.AbortWithStatus(http.StatusNotFound)
return
@@ -480,7 +503,7 @@ func (a *api) onRTMPSConnsKick(ctx *gin.Context) {
}
func (a *api) onHLSMuxersList(ctx *gin.Context) {
res := a.hlsServer.apiHLSMuxersList(hlsServerAPIMuxersListReq{})
res := a.hlsServer.apiHLSMuxersList()
if res.err != nil {
ctx.AbortWithStatus(http.StatusInternalServerError)
return

View File

@@ -370,8 +370,10 @@ func TestAPIProtocolSpecificList(t *testing.T) {
defer os.Remove(serverKeyFpath)
for _, ca := range []string{
"rtsp",
"rtsps",
"rtsp conns",
"rtsp sessions",
"rtsps conns",
"rtsps sessions",
"rtmp",
"rtmps",
"hls",
@@ -380,7 +382,7 @@ func TestAPIProtocolSpecificList(t *testing.T) {
conf := "api: yes\n"
switch ca {
case "rtsps":
case "rtsps conns", "rtsps sessions":
conf += "protocols: [tcp]\n" +
"encryption: strict\n" +
"serverCert: " + serverCertFpath + "\n" +
@@ -406,7 +408,7 @@ func TestAPIProtocolSpecificList(t *testing.T) {
}
switch ca {
case "rtsp":
case "rtsp conns", "rtsp sessions":
source := gortsplib.Client{}
err := source.StartPublishing("rtsp://localhost:8554/mypath",
@@ -414,7 +416,7 @@ func TestAPIProtocolSpecificList(t *testing.T) {
require.NoError(t, err)
defer source.Close()
case "rtsps":
case "rtsps conns", "rtsps sessions":
source := gortsplib.Client{
TLSConfig: &tls.Config{InsecureSkipVerify: true},
}
@@ -478,13 +480,19 @@ func TestAPIProtocolSpecificList(t *testing.T) {
}
switch ca {
case "rtsp", "rtsps", "rtmp", "rtmps":
case "rtsp conns", "rtsp sessions", "rtsps conns", "rtsps sessions", "rtmp", "rtmps":
var pa string
switch ca {
case "rtsp":
case "rtsp conns":
pa = "rtspconns"
case "rtsp sessions":
pa = "rtspsessions"
case "rtsps":
case "rtsps conns":
pa = "rtspsconns"
case "rtsps sessions":
pa = "rtspssessions"
case "rtmp":
@@ -507,7 +515,9 @@ func TestAPIProtocolSpecificList(t *testing.T) {
firstID = k
}
if ca != "rtsp conns" && ca != "rtsps conns" {
require.Equal(t, "publish", out.Items[firstID].State)
}
case "hls":
var out struct {

View File

@@ -163,7 +163,7 @@ func TestCorePathAutoDeletion(t *testing.T) {
}
}()
res := p.pathManager.apiPathsList(pathAPIPathsListReq{})
res := p.pathManager.apiPathsList()
require.NoError(t, res.err)
require.Equal(t, 0, len(res.data.Items))

View File

@@ -392,8 +392,11 @@ func (s *hlsServer) pathSourceNotReady(pa *path) {
}
// apiHLSMuxersList is called by api.
func (s *hlsServer) apiHLSMuxersList(req hlsServerAPIMuxersListReq) hlsServerAPIMuxersListRes {
req.res = make(chan hlsServerAPIMuxersListRes)
func (s *hlsServer) apiHLSMuxersList() hlsServerAPIMuxersListRes {
req := hlsServerAPIMuxersListReq{
res: make(chan hlsServerAPIMuxersListRes),
}
select {
case s.chAPIMuxerList <- req:
res := <-req.res

View File

@@ -18,19 +18,20 @@ func metric(key string, value int64) string {
}
type metricsPathManager interface {
apiPathsList(req pathAPIPathsListReq) pathAPIPathsListRes
apiPathsList() pathAPIPathsListRes
}
type metricsRTSPServer interface {
apiSessionsList(req rtspServerAPISessionsListReq) rtspServerAPISessionsListRes
apiConnsList() rtspServerAPIConnsListRes
apiSessionsList() rtspServerAPISessionsListRes
}
type metricsRTMPServer interface {
apiConnsList(req rtmpServerAPIConnsListReq) rtmpServerAPIConnsListRes
apiConnsList() rtmpServerAPIConnsListRes
}
type metricsHLSServer interface {
apiHLSMuxersList(req hlsServerAPIMuxersListReq) hlsServerAPIMuxersListRes
apiHLSMuxersList() hlsServerAPIMuxersListRes
}
type metricsParent interface {
@@ -90,7 +91,7 @@ func (m *metrics) log(level logger.Level, format string, args ...interface{}) {
func (m *metrics) onMetrics(ctx *gin.Context) {
out := ""
res := m.pathManager.apiPathsList(pathAPIPathsListReq{})
res := m.pathManager.apiPathsList()
if res.err == nil {
for name, p := range res.data.Items {
if p.SourceReady {
@@ -102,7 +103,15 @@ func (m *metrics) onMetrics(ctx *gin.Context) {
}
if !interfaceIsEmpty(m.rtspServer) {
res := m.rtspServer.apiSessionsList(rtspServerAPISessionsListReq{})
func() {
res := m.rtspServer.apiConnsList()
if res.err == nil {
out += metric("rtsp_conns", int64(len(res.data.Items)))
}
}()
func() {
res := m.rtspServer.apiSessionsList()
if res.err == nil {
idleCount := int64(0)
readCount := int64(0)
@@ -126,10 +135,19 @@ func (m *metrics) onMetrics(ctx *gin.Context) {
out += metric("rtsp_sessions{state=\"publish\"}",
publishCount)
}
}()
}
if !interfaceIsEmpty(m.rtspsServer) {
res := m.rtspsServer.apiSessionsList(rtspServerAPISessionsListReq{})
func() {
res := m.rtspsServer.apiConnsList()
if res.err == nil {
out += metric("rtsps_conns", int64(len(res.data.Items)))
}
}()
func() {
res := m.rtspsServer.apiSessionsList()
if res.err == nil {
idleCount := int64(0)
readCount := int64(0)
@@ -153,10 +171,11 @@ func (m *metrics) onMetrics(ctx *gin.Context) {
out += metric("rtsps_sessions{state=\"publish\"}",
publishCount)
}
}()
}
if !interfaceIsEmpty(m.rtmpServer) {
res := m.rtmpServer.apiConnsList(rtmpServerAPIConnsListReq{})
res := m.rtmpServer.apiConnsList()
if res.err == nil {
idleCount := int64(0)
readCount := int64(0)
@@ -183,7 +202,7 @@ func (m *metrics) onMetrics(ctx *gin.Context) {
}
if !interfaceIsEmpty(m.hlsServer) {
res := m.hlsServer.apiHLSMuxersList(hlsServerAPIMuxersListReq{})
res := m.hlsServer.apiHLSMuxersList()
if res.err == nil {
for name := range res.data.Items {
out += metric("hls_muxers{name=\""+name+"\"}", 1)

View File

@@ -102,9 +102,11 @@ func TestMetrics(t *testing.T) {
"rtmp_conns{state=\"idle\"}": "0",
"rtmp_conns{state=\"publish\"}": "1",
"rtmp_conns{state=\"read\"}": "0",
"rtsp_conns": "1",
"rtsp_sessions{state=\"idle\"}": "0",
"rtsp_sessions{state=\"publish\"}": "1",
"rtsp_sessions{state=\"read\"}": "0",
"rtsps_conns": "0",
"rtsps_sessions{state=\"idle\"}": "0",
"rtsps_sessions{state=\"publish\"}": "0",
"rtsps_sessions{state=\"read\"}": "0",

View File

@@ -407,8 +407,11 @@ func (pm *pathManager) hlsServerSet(s pathManagerHLSServer) {
}
// apiPathsList is called by api.
func (pm *pathManager) apiPathsList(req pathAPIPathsListReq) pathAPIPathsListRes {
req.res = make(chan pathAPIPathsListRes)
func (pm *pathManager) apiPathsList() pathAPIPathsListRes {
req := pathAPIPathsListReq{
res: make(chan pathAPIPathsListRes),
}
select {
case pm.chAPIPathsList <- req:
res := <-req.res

View File

@@ -314,8 +314,11 @@ func (s *rtmpServer) connClose(c *rtmpConn) {
}
// apiConnsList is called by api.
func (s *rtmpServer) apiConnsList(req rtmpServerAPIConnsListReq) rtmpServerAPIConnsListRes {
req.res = make(chan rtmpServerAPIConnsListRes)
func (s *rtmpServer) apiConnsList() rtmpServerAPIConnsListRes {
req := rtmpServerAPIConnsListReq{
res: make(chan rtmpServerAPIConnsListRes),
}
select {
case s.chAPIConnsList <- req:
return <-req.res
@@ -326,8 +329,12 @@ func (s *rtmpServer) apiConnsList(req rtmpServerAPIConnsListReq) rtmpServerAPICo
}
// apiConnsKick is called by api.
func (s *rtmpServer) apiConnsKick(req rtmpServerAPIConnsKickReq) rtmpServerAPIConnsKickRes {
req.res = make(chan rtmpServerAPIConnsKickRes)
func (s *rtmpServer) apiConnsKick(id string) rtmpServerAPIConnsKickRes {
req := rtmpServerAPIConnsKickReq{
id: id,
res: make(chan rtmpServerAPIConnsKickRes),
}
select {
case s.chAPIConnsKick <- req:
return <-req.res

View File

@@ -25,6 +25,7 @@ type rtspConnParent interface {
}
type rtspConn struct {
id string
externalAuthenticationURL string
rtspAddress string
authMethods []headers.AuthMethod
@@ -36,6 +37,7 @@ type rtspConn struct {
conn *gortsplib.ServerConn
parent rtspConnParent
created time.Time
onConnectCmd *externalcmd.Cmd
authUser string
authPass string
@@ -44,6 +46,7 @@ type rtspConn struct {
}
func newRTSPConn(
id string,
externalAuthenticationURL string,
rtspAddress string,
authMethods []headers.AuthMethod,
@@ -56,6 +59,7 @@ func newRTSPConn(
parent rtspConnParent,
) *rtspConn {
c := &rtspConn{
id: id,
externalAuthenticationURL: externalAuthenticationURL,
rtspAddress: rtspAddress,
authMethods: authMethods,
@@ -66,6 +70,7 @@ func newRTSPConn(
pathManager: pathManager,
conn: conn,
parent: parent,
created: time.Now(),
}
c.log(logger.Info, "opened")
@@ -98,6 +103,10 @@ func (c *rtspConn) Conn() *gortsplib.ServerConn {
return c.conn
}
func (c *rtspConn) remoteAddr() net.Addr {
return c.conn.NetConn().RemoteAddr()
}
func (c *rtspConn) ip() net.IP {
return c.conn.NetConn().RemoteAddr().(*net.TCPAddr).IP
}

View File

@@ -20,6 +20,20 @@ import (
"github.com/aler9/rtsp-simple-server/internal/logger"
)
type rtspServerAPIConnsListItem struct {
Created time.Time `json:"created"`
RemoteAddr string `json:"remoteAddr"`
}
type rtspServerAPIConnsListData struct {
Items map[string]rtspServerAPIConnsListItem `json:"items"`
}
type rtspServerAPIConnsListRes struct {
data *rtspServerAPIConnsListData
err error
}
type rtspServerAPISessionsListItem struct {
Created time.Time `json:"created"`
RemoteAddr string `json:"remoteAddr"`
@@ -35,16 +49,10 @@ type rtspServerAPISessionsListRes struct {
err error
}
type rtspServerAPISessionsListReq struct{}
type rtspServerAPISessionsKickRes struct {
err error
}
type rtspServerAPISessionsKickReq struct {
id string
}
type rtspServerParent interface {
Log(logger.Level, string, ...interface{})
}
@@ -259,9 +267,40 @@ func (s *rtspServer) newSessionID() (string, error) {
}
}
func (s *rtspServer) newConnID() (string, error) {
for {
b := make([]byte, 4)
_, err := rand.Read(b)
if err != nil {
return "", err
}
u := uint32(b[3])<<24 | uint32(b[2])<<16 | uint32(b[1])<<8 | uint32(b[0])
u %= 899999999
u += 100000000
id := strconv.FormatUint(uint64(u), 10)
alreadyPresent := func() bool {
for _, c := range s.conns {
if c.id == id {
return true
}
}
return false
}()
if !alreadyPresent {
return id, nil
}
}
}
// OnConnOpen implements gortsplib.ServerHandlerOnConnOpen.
func (s *rtspServer) OnConnOpen(ctx *gortsplib.ServerHandlerOnConnOpenCtx) {
s.mutex.Lock()
id, _ := s.newConnID()
c := newRTSPConn(
id,
s.externalAuthenticationURL,
s.rtspAddress,
s.authMethods,
@@ -272,9 +311,9 @@ func (s *rtspServer) OnConnOpen(ctx *gortsplib.ServerHandlerOnConnOpenCtx) {
s.pathManager,
ctx.Conn,
s)
s.mutex.Lock()
s.conns[ctx.Conn] = c
s.mutex.Unlock()
ctx.Conn.SetUserData(c)
}
@@ -380,8 +419,33 @@ func (s *rtspServer) OnDecodeError(ctx *gortsplib.ServerHandlerOnDecodeErrorCtx)
se.onDecodeError(ctx)
}
// apiConnsList is called by api and metrics.
func (s *rtspServer) apiConnsList() rtspServerAPIConnsListRes {
select {
case <-s.ctx.Done():
return rtspServerAPIConnsListRes{err: fmt.Errorf("terminated")}
default:
}
s.mutex.RLock()
defer s.mutex.RUnlock()
data := &rtspServerAPIConnsListData{
Items: make(map[string]rtspServerAPIConnsListItem),
}
for _, c := range s.conns {
data.Items[c.id] = rtspServerAPIConnsListItem{
Created: c.created,
RemoteAddr: c.remoteAddr().String(),
}
}
return rtspServerAPIConnsListRes{data: data}
}
// apiSessionsList is called by api and metrics.
func (s *rtspServer) apiSessionsList(req rtspServerAPISessionsListReq) rtspServerAPISessionsListRes {
func (s *rtspServer) apiSessionsList() rtspServerAPISessionsListRes {
select {
case <-s.ctx.Done():
return rtspServerAPISessionsListRes{err: fmt.Errorf("terminated")}
@@ -418,7 +482,7 @@ func (s *rtspServer) apiSessionsList(req rtspServerAPISessionsListReq) rtspServe
}
// apiSessionsKick is called by api.
func (s *rtspServer) apiSessionsKick(req rtspServerAPISessionsKickReq) rtspServerAPISessionsKickRes {
func (s *rtspServer) apiSessionsKick(id string) rtspServerAPISessionsKickRes {
select {
case <-s.ctx.Done():
return rtspServerAPISessionsKickRes{err: fmt.Errorf("terminated")}
@@ -429,7 +493,7 @@ func (s *rtspServer) apiSessionsKick(req rtspServerAPISessionsKickReq) rtspServe
defer s.mutex.RUnlock()
for key, se := range s.sessions {
if se.id == req.id {
if se.id == id {
se.close()
delete(s.sessions, key)
se.onClose(liberrors.ErrServerTerminated{})