diff --git a/internal/api/api.go b/internal/api/api.go index beb64b78..2b9c8f2c 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -77,48 +77,6 @@ func recordingsOfPath( return ret } -// PathManager contains methods used by the API and Metrics server. -type PathManager interface { - APIPathsList() (*defs.APIPathList, error) - APIPathsGet(string) (*defs.APIPath, error) -} - -// HLSServer contains methods used by the API and Metrics server. -type HLSServer interface { - APIMuxersList() (*defs.APIHLSMuxerList, error) - APIMuxersGet(string) (*defs.APIHLSMuxer, error) -} - -// RTSPServer contains methods used by the API and Metrics server. -type RTSPServer interface { - APIConnsList() (*defs.APIRTSPConnsList, error) - APIConnsGet(uuid.UUID) (*defs.APIRTSPConn, error) - APISessionsList() (*defs.APIRTSPSessionList, error) - APISessionsGet(uuid.UUID) (*defs.APIRTSPSession, error) - APISessionsKick(uuid.UUID) error -} - -// RTMPServer contains methods used by the API and Metrics server. -type RTMPServer interface { - APIConnsList() (*defs.APIRTMPConnList, error) - APIConnsGet(uuid.UUID) (*defs.APIRTMPConn, error) - APIConnsKick(uuid.UUID) error -} - -// SRTServer contains methods used by the API and Metrics server. -type SRTServer interface { - APIConnsList() (*defs.APISRTConnList, error) - APIConnsGet(uuid.UUID) (*defs.APISRTConn, error) - APIConnsKick(uuid.UUID) error -} - -// WebRTCServer contains methods used by the API and Metrics server. -type WebRTCServer interface { - APISessionsList() (*defs.APIWebRTCSessionList, error) - APISessionsGet(uuid.UUID) (*defs.APIWebRTCSession, error) - APISessionsKick(uuid.UUID) error -} - type apiAuthManager interface { Authenticate(req *auth.Request) error } @@ -139,14 +97,14 @@ type API struct { ReadTimeout conf.Duration Conf *conf.Conf AuthManager apiAuthManager - PathManager PathManager - RTSPServer RTSPServer - RTSPSServer RTSPServer - RTMPServer RTMPServer - RTMPSServer RTMPServer - HLSServer HLSServer - WebRTCServer WebRTCServer - SRTServer SRTServer + PathManager defs.APIPathManager + RTSPServer defs.APIRTSPServer + RTSPSServer defs.APIRTSPServer + RTMPServer defs.APIRTMPServer + RTMPSServer defs.APIRTMPServer + HLSServer defs.APIHLSServer + WebRTCServer defs.APIWebRTCServer + SRTServer defs.APISRTServer Parent apiParent httpServer *httpp.Server diff --git a/internal/core/core.go b/internal/core/core.go index 4adbdb63..0c6d979c 100644 --- a/internal/core/core.go +++ b/internal/core/core.go @@ -361,13 +361,10 @@ func (p *Core) createResources(initial bool) error { udpMaxPayloadSize: p.conf.UDPMaxPayloadSize, pathConfs: p.conf.Paths, externalCmdPool: p.externalCmdPool, + metrics: p.metrics, parent: p, } p.pathManager.initialize() - - if p.metrics != nil { - p.metrics.SetPathManager(p.pathManager) - } } if p.conf.RTSP && @@ -399,6 +396,7 @@ func (p *Core) createResources(initial bool) error { RunOnConnectRestart: p.conf.RunOnConnectRestart, RunOnDisconnect: p.conf.RunOnDisconnect, ExternalCmdPool: p.externalCmdPool, + Metrics: p.metrics, PathManager: p.pathManager, Parent: p, } @@ -407,10 +405,6 @@ func (p *Core) createResources(initial bool) error { return err } p.rtspServer = i - - if p.metrics != nil { - p.metrics.SetRTSPServer(p.rtspServer) - } } if p.conf.RTSP && @@ -439,6 +433,7 @@ func (p *Core) createResources(initial bool) error { RunOnConnectRestart: p.conf.RunOnConnectRestart, RunOnDisconnect: p.conf.RunOnDisconnect, ExternalCmdPool: p.externalCmdPool, + Metrics: p.metrics, PathManager: p.pathManager, Parent: p, } @@ -447,10 +442,6 @@ func (p *Core) createResources(initial bool) error { return err } p.rtspsServer = i - - if p.metrics != nil { - p.metrics.SetRTSPSServer(p.rtspsServer) - } } if p.conf.RTMP && @@ -469,6 +460,7 @@ func (p *Core) createResources(initial bool) error { RunOnConnectRestart: p.conf.RunOnConnectRestart, RunOnDisconnect: p.conf.RunOnDisconnect, ExternalCmdPool: p.externalCmdPool, + Metrics: p.metrics, PathManager: p.pathManager, Parent: p, } @@ -477,10 +469,6 @@ func (p *Core) createResources(initial bool) error { return err } p.rtmpServer = i - - if p.metrics != nil { - p.metrics.SetRTMPServer(p.rtmpServer) - } } if p.conf.RTMP && @@ -499,6 +487,7 @@ func (p *Core) createResources(initial bool) error { RunOnConnectRestart: p.conf.RunOnConnectRestart, RunOnDisconnect: p.conf.RunOnDisconnect, ExternalCmdPool: p.externalCmdPool, + Metrics: p.metrics, PathManager: p.pathManager, Parent: p, } @@ -507,10 +496,6 @@ func (p *Core) createResources(initial bool) error { return err } p.rtmpsServer = i - - if p.metrics != nil { - p.metrics.SetRTMPSServer(p.rtmpsServer) - } } if p.conf.HLS && @@ -531,6 +516,7 @@ func (p *Core) createResources(initial bool) error { Directory: p.conf.HLSDirectory, ReadTimeout: p.conf.ReadTimeout, MuxerCloseAfter: p.conf.HLSMuxerCloseAfter, + Metrics: p.metrics, PathManager: p.pathManager, Parent: p, } @@ -539,12 +525,6 @@ func (p *Core) createResources(initial bool) error { return err } p.hlsServer = i - - p.pathManager.setHLSServer(p.hlsServer) - - if p.metrics != nil { - p.metrics.SetHLSServer(p.hlsServer) - } } if p.conf.WebRTC && @@ -567,6 +547,7 @@ func (p *Core) createResources(initial bool) error { STUNGatherTimeout: p.conf.WebRTCSTUNGatherTimeout, TrackGatherTimeout: p.conf.WebRTCTrackGatherTimeout, ExternalCmdPool: p.externalCmdPool, + Metrics: p.metrics, PathManager: p.pathManager, Parent: p, } @@ -575,10 +556,6 @@ func (p *Core) createResources(initial bool) error { return err } p.webRTCServer = i - - if p.metrics != nil { - p.metrics.SetWebRTCServer(p.webRTCServer) - } } if p.conf.SRT && @@ -593,6 +570,7 @@ func (p *Core) createResources(initial bool) error { RunOnConnectRestart: p.conf.RunOnConnectRestart, RunOnDisconnect: p.conf.RunOnDisconnect, ExternalCmdPool: p.externalCmdPool, + Metrics: p.metrics, PathManager: p.pathManager, Parent: p, } @@ -601,10 +579,6 @@ func (p *Core) createResources(initial bool) error { return err } p.srtServer = i - - if p.metrics != nil { - p.metrics.SetSRTServer(p.srtServer) - } } if p.conf.API && @@ -887,75 +861,41 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { } if closeSRTServer && p.srtServer != nil { - if p.metrics != nil { - p.metrics.SetSRTServer(nil) - } - p.srtServer.Close() p.srtServer = nil } if closeWebRTCServer && p.webRTCServer != nil { - if p.metrics != nil { - p.metrics.SetWebRTCServer(nil) - } - p.webRTCServer.Close() p.webRTCServer = nil } if closeHLSServer && p.hlsServer != nil { - if p.metrics != nil { - p.metrics.SetHLSServer(nil) - } - - p.pathManager.setHLSServer(nil) - p.hlsServer.Close() p.hlsServer = nil } if closeRTMPSServer && p.rtmpsServer != nil { - if p.metrics != nil { - p.metrics.SetRTMPSServer(nil) - } - p.rtmpsServer.Close() p.rtmpsServer = nil } if closeRTMPServer && p.rtmpServer != nil { - if p.metrics != nil { - p.metrics.SetRTMPServer(nil) - } - p.rtmpServer.Close() p.rtmpServer = nil } if closeRTSPSServer && p.rtspsServer != nil { - if p.metrics != nil { - p.metrics.SetRTSPSServer(nil) - } - p.rtspsServer.Close() p.rtspsServer = nil } if closeRTSPServer && p.rtspServer != nil { - if p.metrics != nil { - p.metrics.SetRTSPServer(nil) - } - p.rtspServer.Close() p.rtspServer = nil } if closePathManager && p.pathManager != nil { - if p.metrics != nil { - p.metrics.SetPathManager(nil) - } - p.pathManager.close() p.pathManager = nil } diff --git a/internal/core/path.go b/internal/core/path.go index 45b7280e..da4d7ea6 100644 --- a/internal/core/path.go +++ b/internal/core/path.go @@ -161,6 +161,10 @@ func (pa *path) Name() string { return pa.name } +func (pa *path) isReady() bool { + return pa.stream != nil +} + func (pa *path) run() { defer close(pa.done) defer pa.wg.Done() @@ -568,28 +572,28 @@ func (pa *path) doAPIPathsGet(req pathAPIPathsGetReq) { v := pa.source.APISourceDescribe() return &v }(), - Ready: pa.stream != nil, + Ready: pa.isReady(), ReadyTime: func() *time.Time { - if pa.stream == nil { + if !pa.isReady() { return nil } v := pa.readyTime return &v }(), Tracks: func() []string { - if pa.stream == nil { + if !pa.isReady() { return []string{} } return defs.MediasToCodecs(pa.stream.Desc.Medias) }(), BytesReceived: func() uint64 { - if pa.stream == nil { + if !pa.isReady() { return 0 } return pa.stream.BytesReceived() }(), BytesSent: func() uint64 { - if pa.stream == nil { + if !pa.isReady() { return 0 } return pa.stream.BytesSent() diff --git a/internal/core/path_manager.go b/internal/core/path_manager.go index bbd1bb7b..fe445948 100644 --- a/internal/core/path_manager.go +++ b/internal/core/path_manager.go @@ -11,6 +11,8 @@ import ( "github.com/bluenviron/mediamtx/internal/defs" "github.com/bluenviron/mediamtx/internal/externalcmd" "github.com/bluenviron/mediamtx/internal/logger" + "github.com/bluenviron/mediamtx/internal/metrics" + "github.com/bluenviron/mediamtx/internal/servers/hls" "github.com/bluenviron/mediamtx/internal/stream" ) @@ -39,9 +41,18 @@ func pathConfCanBeUpdated(oldPathConf *conf.Path, newPathConf *conf.Path) bool { return newPathConf.Equal(clone) } -type pathManagerHLSServer interface { - PathReady(defs.Path) - PathNotReady(defs.Path) +type pathSetHLSServerRes struct { + readyPaths []defs.Path +} + +type pathSetHLSServerReq struct { + s *hls.Server + res chan pathSetHLSServerRes +} + +type pathData struct { + path *path + ready bool } type pathManagerParent interface { @@ -58,18 +69,19 @@ type pathManager struct { udpMaxPayloadSize int pathConfs map[string]*conf.Path externalCmdPool *externalcmd.Pool + metrics *metrics.Metrics parent pathManagerParent ctx context.Context ctxCancel func() wg sync.WaitGroup - hlsManager pathManagerHLSServer - paths map[string]*path + hlsServer *hls.Server + paths map[string]*pathData pathsByConf map[string]map[*path]struct{} // in chReloadConf chan map[string]*conf.Path - chSetHLSServer chan pathManagerHLSServer + chSetHLSServer chan pathSetHLSServerReq chClosePath chan *path chPathReady chan *path chPathNotReady chan *path @@ -86,10 +98,10 @@ func (pm *pathManager) initialize() { pm.ctx = ctx pm.ctxCancel = ctxCancel - pm.paths = make(map[string]*path) + pm.paths = make(map[string]*pathData) pm.pathsByConf = make(map[string]map[*path]struct{}) pm.chReloadConf = make(chan map[string]*conf.Path) - pm.chSetHLSServer = make(chan pathManagerHLSServer) + pm.chSetHLSServer = make(chan pathSetHLSServerReq) pm.chClosePath = make(chan *path) pm.chPathReady = make(chan *path) pm.chPathNotReady = make(chan *path) @@ -110,10 +122,19 @@ func (pm *pathManager) initialize() { pm.wg.Add(1) go pm.run() + + if pm.metrics != nil { + pm.metrics.SetPathManager(pm) + } } func (pm *pathManager) close() { pm.Log(logger.Debug, "path manager is shutting down") + + if pm.metrics != nil { + pm.metrics.SetPathManager(nil) + } + pm.ctxCancel() pm.wg.Wait() } @@ -132,8 +153,9 @@ outer: case newPaths := <-pm.chReloadConf: pm.doReloadConf(newPaths) - case m := <-pm.chSetHLSServer: - pm.doSetHLSServer(m) + case req := <-pm.chSetHLSServer: + readyPaths := pm.doSetHLSServer(req.s) + req.res <- pathSetHLSServerRes{readyPaths: readyPaths} case pa := <-pm.chClosePath: pm.doClosePath(pa) @@ -207,26 +229,48 @@ func (pm *pathManager) doReloadConf(newPaths map[string]*conf.Path) { } } -func (pm *pathManager) doSetHLSServer(m pathManagerHLSServer) { - pm.hlsManager = m +func (pm *pathManager) doSetHLSServer(m *hls.Server) []defs.Path { + pm.hlsServer = m + + var ret []defs.Path + + for _, pd := range pm.paths { + if pd.ready { + ret = append(ret, pd.path) + } + } + + return ret } func (pm *pathManager) doClosePath(pa *path) { - if pmpa, ok := pm.paths[pa.name]; !ok || pmpa != pa { + if pd, ok := pm.paths[pa.name]; !ok || pd.path != pa { return } pm.removePath(pa) } func (pm *pathManager) doPathReady(pa *path) { - if pm.hlsManager != nil { - pm.hlsManager.PathReady(pa) + if pd, ok := pm.paths[pa.name]; !ok || pd.path != pa { + return + } + + pm.paths[pa.name].ready = true + + if pm.hlsServer != nil { + pm.hlsServer.PathReady(pa) } } func (pm *pathManager) doPathNotReady(pa *path) { - if pm.hlsManager != nil { - pm.hlsManager.PathNotReady(pa) + if pd, ok := pm.paths[pa.name]; !ok || pd.path != pa { + return + } + + pm.paths[pa.name].ready = false + + if pm.hlsServer != nil { + pm.hlsServer.PathNotReady(pa) } } @@ -264,7 +308,8 @@ func (pm *pathManager) doDescribe(req defs.PathDescribeReq) { pm.createPath(pathConf, req.AccessRequest.Name, pathMatches) } - req.Res <- defs.PathDescribeRes{Path: pm.paths[req.AccessRequest.Name]} + pd := pm.paths[req.AccessRequest.Name] + req.Res <- defs.PathDescribeRes{Path: pd.path} } func (pm *pathManager) doAddReader(req defs.PathAddReaderReq) { @@ -287,7 +332,8 @@ func (pm *pathManager) doAddReader(req defs.PathAddReaderReq) { pm.createPath(pathConf, req.AccessRequest.Name, pathMatches) } - req.Res <- defs.PathAddReaderRes{Path: pm.paths[req.AccessRequest.Name]} + pd := pm.paths[req.AccessRequest.Name] + req.Res <- defs.PathAddReaderRes{Path: pd.path} } func (pm *pathManager) doAddPublisher(req defs.PathAddPublisherReq) { @@ -310,27 +356,28 @@ func (pm *pathManager) doAddPublisher(req defs.PathAddPublisherReq) { pm.createPath(pathConf, req.AccessRequest.Name, pathMatches) } - req.Res <- defs.PathAddPublisherRes{Path: pm.paths[req.AccessRequest.Name]} + pd := pm.paths[req.AccessRequest.Name] + req.Res <- defs.PathAddPublisherRes{Path: pd.path} } func (pm *pathManager) doAPIPathsList(req pathAPIPathsListReq) { paths := make(map[string]*path) - for name, pa := range pm.paths { - paths[name] = pa + for name, pd := range pm.paths { + paths[name] = pd.path } req.res <- pathAPIPathsListRes{paths: paths} } func (pm *pathManager) doAPIPathsGet(req pathAPIPathsGetReq) { - path, ok := pm.paths[req.name] + pd, ok := pm.paths[req.name] if !ok { req.res <- pathAPIPathsGetRes{err: conf.ErrPathNotFound} return } - req.res <- pathAPIPathsGetRes{path: path} + req.res <- pathAPIPathsGetRes{path: pd.path} } func (pm *pathManager) createPath( @@ -355,7 +402,7 @@ func (pm *pathManager) createPath( } pa.initialize() - pm.paths[name] = pa + pm.paths[name] = &pathData{path: pa} if _, ok := pm.pathsByConf[pathConf.Name]; !ok { pm.pathsByConf[pathConf.Name] = make(map[*path]struct{}) @@ -476,11 +523,20 @@ func (pm *pathManager) AddReader(req defs.PathAddReaderReq) (defs.Path, *stream. } } -// setHLSServer is called by hlsManager. -func (pm *pathManager) setHLSServer(s pathManagerHLSServer) { +// SetHLSServer is called by hls.Server. +func (pm *pathManager) SetHLSServer(s *hls.Server) []defs.Path { + req := pathSetHLSServerReq{ + s: s, + res: make(chan pathSetHLSServerRes), + } + select { - case pm.chSetHLSServer <- s: + case pm.chSetHLSServer <- req: + res := <-req.res + return res.readyPaths + case <-pm.ctx.Done(): + return nil } } diff --git a/internal/defs/api.go b/internal/defs/api.go index 9b3ae361..05830230 100644 --- a/internal/defs/api.go +++ b/internal/defs/api.go @@ -8,6 +8,48 @@ import ( "github.com/bluenviron/mediamtx/internal/conf" ) +// APIPathManager contains methods used by the API and Metrics server. +type APIPathManager interface { + APIPathsList() (*APIPathList, error) + APIPathsGet(string) (*APIPath, error) +} + +// APIHLSServer contains methods used by the API and Metrics server. +type APIHLSServer interface { + APIMuxersList() (*APIHLSMuxerList, error) + APIMuxersGet(string) (*APIHLSMuxer, error) +} + +// APIRTSPServer contains methods used by the API and Metrics server. +type APIRTSPServer interface { + APIConnsList() (*APIRTSPConnsList, error) + APIConnsGet(uuid.UUID) (*APIRTSPConn, error) + APISessionsList() (*APIRTSPSessionList, error) + APISessionsGet(uuid.UUID) (*APIRTSPSession, error) + APISessionsKick(uuid.UUID) error +} + +// APIRTMPServer contains methods used by the API and Metrics server. +type APIRTMPServer interface { + APIConnsList() (*APIRTMPConnList, error) + APIConnsGet(uuid.UUID) (*APIRTMPConn, error) + APIConnsKick(uuid.UUID) error +} + +// APISRTServer contains methods used by the API and Metrics server. +type APISRTServer interface { + APIConnsList() (*APISRTConnList, error) + APIConnsGet(uuid.UUID) (*APISRTConn, error) + APIConnsKick(uuid.UUID) error +} + +// APIWebRTCServer contains methods used by the API and Metrics server. +type APIWebRTCServer interface { + APISessionsList() (*APIWebRTCSessionList, error) + APISessionsGet(uuid.UUID) (*APIWebRTCSession, error) + APISessionsKick(uuid.UUID) error +} + // APIError is a generic error. type APIError struct { Error string `json:"error"` diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 3b710f2a..e2069a28 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -12,9 +12,9 @@ import ( "github.com/gin-gonic/gin" - "github.com/bluenviron/mediamtx/internal/api" "github.com/bluenviron/mediamtx/internal/auth" "github.com/bluenviron/mediamtx/internal/conf" + "github.com/bluenviron/mediamtx/internal/defs" "github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/protocols/httpp" "github.com/bluenviron/mediamtx/internal/restrictnetwork" @@ -54,14 +54,14 @@ type Metrics struct { httpServer *httpp.Server mutex sync.Mutex - pathManager api.PathManager - rtspServer api.RTSPServer - rtspsServer api.RTSPServer - rtmpServer api.RTMPServer - rtmpsServer api.RTMPServer - srtServer api.SRTServer - hlsManager api.HLSServer - webRTCServer api.WebRTCServer + pathManager defs.APIPathManager + rtspServer defs.APIRTSPServer + rtspsServer defs.APIRTSPServer + rtmpServer defs.APIRTMPServer + rtmpsServer defs.APIRTMPServer + srtServer defs.APISRTServer + hlsServer defs.APIHLSServer + webRTCServer defs.APIWebRTCServer } // Initialize initializes metrics. @@ -166,8 +166,8 @@ func (m *Metrics) onMetrics(ctx *gin.Context) { out += metric("paths", "", 0) } - if !interfaceIsEmpty(m.hlsManager) { - data, err := m.hlsManager.APIMuxersList() + if !interfaceIsEmpty(m.hlsServer) { + data, err := m.hlsServer.APIMuxersList() if err == nil && len(data.Items) != 0 { for _, i := range data.Items { tags := "{name=\"" + i.Path + "\"}" @@ -447,56 +447,56 @@ func (m *Metrics) onMetrics(ctx *gin.Context) { } // SetPathManager is called by core. -func (m *Metrics) SetPathManager(s api.PathManager) { +func (m *Metrics) SetPathManager(s defs.APIPathManager) { m.mutex.Lock() defer m.mutex.Unlock() m.pathManager = s } // SetHLSServer is called by core. -func (m *Metrics) SetHLSServer(s api.HLSServer) { +func (m *Metrics) SetHLSServer(s defs.APIHLSServer) { m.mutex.Lock() defer m.mutex.Unlock() - m.hlsManager = s + m.hlsServer = s } // SetRTSPServer is called by core. -func (m *Metrics) SetRTSPServer(s api.RTSPServer) { +func (m *Metrics) SetRTSPServer(s defs.APIRTSPServer) { m.mutex.Lock() defer m.mutex.Unlock() m.rtspServer = s } // SetRTSPSServer is called by core. -func (m *Metrics) SetRTSPSServer(s api.RTSPServer) { +func (m *Metrics) SetRTSPSServer(s defs.APIRTSPServer) { m.mutex.Lock() defer m.mutex.Unlock() m.rtspsServer = s } // SetRTMPServer is called by core. -func (m *Metrics) SetRTMPServer(s api.RTMPServer) { +func (m *Metrics) SetRTMPServer(s defs.APIRTMPServer) { m.mutex.Lock() defer m.mutex.Unlock() m.rtmpServer = s } // SetRTMPSServer is called by core. -func (m *Metrics) SetRTMPSServer(s api.RTMPServer) { +func (m *Metrics) SetRTMPSServer(s defs.APIRTMPServer) { m.mutex.Lock() defer m.mutex.Unlock() m.rtmpsServer = s } // SetSRTServer is called by core. -func (m *Metrics) SetSRTServer(s api.SRTServer) { +func (m *Metrics) SetSRTServer(s defs.APISRTServer) { m.mutex.Lock() defer m.mutex.Unlock() m.srtServer = s } // SetWebRTCServer is called by core. -func (m *Metrics) SetWebRTCServer(s api.WebRTCServer) { +func (m *Metrics) SetWebRTCServer(s defs.APIWebRTCServer) { m.mutex.Lock() defer m.mutex.Unlock() m.webRTCServer = s diff --git a/internal/servers/hls/server.go b/internal/servers/hls/server.go index 5f81413c..dff30ff6 100644 --- a/internal/servers/hls/server.go +++ b/internal/servers/hls/server.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "reflect" "sort" "sync" @@ -17,6 +18,10 @@ import ( // ErrMuxerNotFound is returned when a muxer is not found. var ErrMuxerNotFound = errors.New("muxer not found") +func interfaceIsEmpty(i interface{}) bool { + return reflect.ValueOf(i).Kind() != reflect.Ptr || reflect.ValueOf(i).IsNil() +} + type serverGetMuxerRes struct { muxer *muxer err error @@ -49,7 +54,12 @@ type serverAPIMuxersGetReq struct { res chan serverAPIMuxersGetRes } +type serverMetrics interface { + SetHLSServer(defs.APIHLSServer) +} + type serverPathManager interface { + SetHLSServer(*Server) []defs.Path FindPathConf(req defs.PathFindPathConfReq) (*conf.Path, error) AddReader(req defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) } @@ -75,6 +85,7 @@ type Server struct { Directory string ReadTimeout conf.Duration MuxerCloseAfter conf.Duration + Metrics serverMetrics PathManager serverPathManager Parent serverParent @@ -129,6 +140,10 @@ func (s *Server) Initialize() error { s.wg.Add(1) go s.run() + if !interfaceIsEmpty(s.Metrics) { + s.Metrics.SetHLSServer(s) + } + return nil } @@ -140,6 +155,11 @@ func (s *Server) Log(level logger.Level, format string, args ...interface{}) { // Close closes the server. func (s *Server) Close() { s.Log(logger.Info, "listener is closing") + + if !interfaceIsEmpty(s.Metrics) { + s.Metrics.SetHLSServer(nil) + } + s.ctxCancel() s.wg.Wait() } @@ -147,6 +167,19 @@ func (s *Server) Close() { func (s *Server) run() { defer s.wg.Done() + readyPaths := s.PathManager.SetHLSServer(s) + defer s.PathManager.SetHLSServer(nil) + + if s.AlwaysRemux { + for _, pa := range readyPaths { + if !pa.SafeConf().SourceOnDemand { + if _, ok := s.muxers[pa.Name()]; !ok { + s.createMuxer(pa.Name(), "", "") + } + } + } + } + outer: for { select { diff --git a/internal/servers/hls/server_test.go b/internal/servers/hls/server_test.go index 120f777a..bb6dfb3e 100644 --- a/internal/servers/hls/server_test.go +++ b/internal/servers/hls/server_test.go @@ -21,6 +21,27 @@ import ( "github.com/stretchr/testify/require" ) +type dummyPathManager struct { + setHLSServerImpl func() []defs.Path + findPathConfImpl func(req defs.PathFindPathConfReq) (*conf.Path, error) + addReaderImpl func(req defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) +} + +func (pm *dummyPathManager) SetHLSServer(*Server) []defs.Path { + if pm.setHLSServerImpl != nil { + return pm.setHLSServerImpl() + } + return nil +} + +func (pm *dummyPathManager) FindPathConf(req defs.PathFindPathConfReq) (*conf.Path, error) { + return pm.findPathConfImpl(req) +} + +func (pm *dummyPathManager) AddReader(req defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) { + return pm.addReaderImpl(req) +} + type dummyPath struct{} func (pa *dummyPath) Name() string { @@ -53,6 +74,7 @@ func TestPreflightRequest(t *testing.T) { Address: "127.0.0.1:8888", AllowOrigin: "*", ReadTimeout: conf.Duration(10 * time.Second), + PathManager: &dummyPathManager{}, Parent: test.NilLogger, } err := s.Initialize() @@ -90,12 +112,12 @@ func TestServerNotFound(t *testing.T) { "always remux on", } { t.Run(ca, func(t *testing.T) { - pm := &test.PathManager{ - FindPathConfImpl: func(req defs.PathFindPathConfReq) (*conf.Path, error) { + pm := &dummyPathManager{ + findPathConfImpl: func(req defs.PathFindPathConfReq) (*conf.Path, error) { require.Equal(t, "nonexisting", req.AccessRequest.Name) return &conf.Path{}, nil }, - AddReaderImpl: func(req defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) { + addReaderImpl: func(req defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) { require.Equal(t, "nonexisting", req.AccessRequest.Name) return nil, nil, fmt.Errorf("not found") }, @@ -164,15 +186,15 @@ func TestServerRead(t *testing.T) { err := strm.Initialize() require.NoError(t, err) - pm := &test.PathManager{ - FindPathConfImpl: func(req defs.PathFindPathConfReq) (*conf.Path, error) { + pm := &dummyPathManager{ + findPathConfImpl: func(req defs.PathFindPathConfReq) (*conf.Path, error) { require.Equal(t, "teststream", req.AccessRequest.Name) require.Equal(t, "param=value", req.AccessRequest.Query) require.Equal(t, "myuser", req.AccessRequest.User) require.Equal(t, "mypass", req.AccessRequest.Pass) return &conf.Path{}, nil }, - AddReaderImpl: func(req defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) { + addReaderImpl: func(req defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) { require.Equal(t, "teststream", req.AccessRequest.Name) require.Equal(t, "param=value", req.AccessRequest.Query) return &dummyPath{}, strm, nil @@ -264,15 +286,15 @@ func TestServerRead(t *testing.T) { err := strm.Initialize() require.NoError(t, err) - pm := &test.PathManager{ - FindPathConfImpl: func(req defs.PathFindPathConfReq) (*conf.Path, error) { + pm := &dummyPathManager{ + findPathConfImpl: func(req defs.PathFindPathConfReq) (*conf.Path, error) { require.Equal(t, "teststream", req.AccessRequest.Name) require.Equal(t, "param=value", req.AccessRequest.Query) require.Equal(t, "myuser", req.AccessRequest.User) require.Equal(t, "mypass", req.AccessRequest.Pass) return &conf.Path{}, nil }, - AddReaderImpl: func(req defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) { + addReaderImpl: func(req defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) { require.Equal(t, "teststream", req.AccessRequest.Name) require.Equal(t, "", req.AccessRequest.Query) return &dummyPath{}, strm, nil @@ -369,8 +391,8 @@ func TestDirectory(t *testing.T) { err = strm.Initialize() require.NoError(t, err) - pm := &test.PathManager{ - AddReaderImpl: func(_ defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) { + pm := &dummyPathManager{ + addReaderImpl: func(_ defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) { return &dummyPath{}, strm, nil }, } @@ -404,3 +426,50 @@ func TestDirectory(t *testing.T) { _, err = os.Stat(filepath.Join(dir, "mydir", "teststream")) require.NoError(t, err) } + +func TestDynamicAlwaysRemux(t *testing.T) { + desc := &description.Session{Medias: []*description.Media{test.MediaH264}} + + strm := &stream.Stream{ + WriteQueueSize: 512, + UDPMaxPayloadSize: 1472, + Desc: desc, + GenerateRTPPackets: true, + Parent: test.NilLogger, + } + err := strm.Initialize() + require.NoError(t, err) + + done := make(chan struct{}) + + pm := &dummyPathManager{ + setHLSServerImpl: func() []defs.Path { + return []defs.Path{&dummyPath{}} + }, + addReaderImpl: func(_ defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) { + close(done) + return &dummyPath{}, strm, nil + }, + } + + s := &Server{ + Address: "127.0.0.1:8888", + Encryption: false, + ServerKey: "", + ServerCert: "", + AlwaysRemux: true, + Variant: conf.HLSVariant(gohlslib.MuxerVariantMPEGTS), + SegmentCount: 7, + SegmentDuration: conf.Duration(1 * time.Second), + PartDuration: conf.Duration(200 * time.Millisecond), + SegmentMaxSize: 50 * 1024 * 1024, + ReadTimeout: conf.Duration(10 * time.Second), + PathManager: pm, + Parent: test.NilLogger, + } + err = s.Initialize() + require.NoError(t, err) + defer s.Close() + + <-done +} diff --git a/internal/servers/rtmp/server.go b/internal/servers/rtmp/server.go index 2fbb8b84..7573a6aa 100644 --- a/internal/servers/rtmp/server.go +++ b/internal/servers/rtmp/server.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "net" + "reflect" "sort" "sync" @@ -24,6 +25,10 @@ import ( // ErrConnNotFound is returned when a connection is not found. var ErrConnNotFound = errors.New("connection not found") +func interfaceIsEmpty(i interface{}) bool { + return reflect.ValueOf(i).Kind() != reflect.Ptr || reflect.ValueOf(i).IsNil() +} + type serverAPIConnsListRes struct { data *defs.APIRTMPConnList err error @@ -52,6 +57,11 @@ type serverAPIConnsKickReq struct { res chan serverAPIConnsKickRes } +type serverMetrics interface { + SetRTMPSServer(defs.APIRTMPServer) + SetRTMPServer(defs.APIRTMPServer) +} + type serverPathManager interface { AddPublisher(req defs.PathAddPublisherReq) (defs.Path, error) AddReader(req defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) @@ -74,6 +84,7 @@ type Server struct { RunOnConnectRestart bool RunOnDisconnect string ExternalCmdPool *externalcmd.Pool + Metrics serverMetrics PathManager serverPathManager Parent serverParent @@ -140,6 +151,14 @@ func (s *Server) Initialize() error { s.wg.Add(1) go s.run() + if !interfaceIsEmpty(s.Metrics) { + if s.IsTLS { + s.Metrics.SetRTMPSServer(s) + } else { + s.Metrics.SetRTMPServer(s) + } + } + return nil } @@ -157,8 +176,18 @@ func (s *Server) Log(level logger.Level, format string, args ...interface{}) { // Close closes the server. func (s *Server) Close() { s.Log(logger.Info, "listener is closing") + + if !interfaceIsEmpty((s.Metrics)) { + if s.IsTLS { + s.Metrics.SetRTMPSServer(nil) + } else { + s.Metrics.SetRTMPServer(nil) + } + } + s.ctxCancel() s.wg.Wait() + if s.loader != nil { s.loader.Close() } diff --git a/internal/servers/rtsp/server.go b/internal/servers/rtsp/server.go index 7a0b4f84..2b4e1409 100644 --- a/internal/servers/rtsp/server.go +++ b/internal/servers/rtsp/server.go @@ -6,6 +6,7 @@ import ( "crypto/tls" "errors" "fmt" + "reflect" "sort" "strings" "sync" @@ -31,6 +32,10 @@ var ErrConnNotFound = errors.New("connection not found") // ErrSessionNotFound is returned when a session is not found. var ErrSessionNotFound = errors.New("session not found") +func interfaceIsEmpty(i interface{}) bool { + return reflect.ValueOf(i).Kind() != reflect.Ptr || reflect.ValueOf(i).IsNil() +} + func printAddresses(srv *gortsplib.Server) string { var ret []string @@ -47,6 +52,11 @@ func printAddresses(srv *gortsplib.Server) string { return strings.Join(ret, ", ") } +type serverMetrics interface { + SetRTSPSServer(defs.APIRTSPServer) + SetRTSPServer(defs.APIRTSPServer) +} + type serverPathManager interface { Describe(req defs.PathDescribeReq) defs.PathDescribeRes AddPublisher(_ defs.PathAddPublisherReq) (defs.Path, error) @@ -80,6 +90,7 @@ type Server struct { RunOnConnectRestart bool RunOnDisconnect string ExternalCmdPool *externalcmd.Pool + Metrics serverMetrics PathManager serverPathManager Parent serverParent @@ -144,6 +155,14 @@ func (s *Server) Initialize() error { s.wg.Add(1) go s.run() + if !interfaceIsEmpty(s.Metrics) { + if s.IsTLS { + s.Metrics.SetRTSPSServer(s) + } else { + s.Metrics.SetRTSPServer(s) + } + } + return nil } @@ -161,8 +180,18 @@ func (s *Server) Log(level logger.Level, format string, args ...interface{}) { // Close closes the server. func (s *Server) Close() { s.Log(logger.Info, "listener is closing") + + if !interfaceIsEmpty(s.Metrics) { + if s.IsTLS { + s.Metrics.SetRTSPSServer(nil) + } else { + s.Metrics.SetRTSPServer(nil) + } + } + s.ctxCancel() s.wg.Wait() + if s.loader != nil { s.loader.Close() } diff --git a/internal/servers/srt/server.go b/internal/servers/srt/server.go index 8bc6bbe4..9f3d592c 100644 --- a/internal/servers/srt/server.go +++ b/internal/servers/srt/server.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "reflect" "sort" "sync" "time" @@ -22,6 +23,10 @@ import ( // ErrConnNotFound is returned when a connection is not found. var ErrConnNotFound = errors.New("connection not found") +func interfaceIsEmpty(i interface{}) bool { + return reflect.ValueOf(i).Kind() != reflect.Ptr || reflect.ValueOf(i).IsNil() +} + func srtMaxPayloadSize(u int) int { return ((u - 16) / 188) * 188 // 16 = SRT header, 188 = MPEG-TS packet } @@ -54,6 +59,10 @@ type serverAPIConnsKickReq struct { res chan serverAPIConnsKickRes } +type serverMetrics interface { + SetSRTServer(defs.APISRTServer) +} + type serverPathManager interface { AddPublisher(req defs.PathAddPublisherReq) (defs.Path, error) AddReader(req defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) @@ -74,6 +83,7 @@ type Server struct { RunOnConnectRestart bool RunOnDisconnect string ExternalCmdPool *externalcmd.Pool + Metrics serverMetrics PathManager serverPathManager Parent serverParent @@ -126,6 +136,10 @@ func (s *Server) Initialize() error { s.wg.Add(1) go s.run() + if !interfaceIsEmpty(s.Metrics) { + s.Metrics.SetSRTServer(s) + } + return nil } @@ -137,6 +151,11 @@ func (s *Server) Log(level logger.Level, format string, args ...interface{}) { // Close closes the server. func (s *Server) Close() { s.Log(logger.Info, "listener is closing") + + if !interfaceIsEmpty(s.Metrics) { + s.Metrics.SetSRTServer(nil) + } + s.ctxCancel() s.wg.Wait() } diff --git a/internal/servers/webrtc/server.go b/internal/servers/webrtc/server.go index 0176357b..4c299287 100644 --- a/internal/servers/webrtc/server.go +++ b/internal/servers/webrtc/server.go @@ -11,6 +11,7 @@ import ( "fmt" "net" "net/http" + "reflect" "sort" "strconv" "sync" @@ -36,6 +37,10 @@ const ( // ErrSessionNotFound is returned when a session is not found. var ErrSessionNotFound = errors.New("session not found") +func interfaceIsEmpty(i interface{}) bool { + return reflect.ValueOf(i).Kind() != reflect.Ptr || reflect.ValueOf(i).IsNil() +} + type nilWriter struct{} func (nilWriter) Write(p []byte) (int, error) { @@ -163,6 +168,10 @@ type webRTCDeleteSessionReq struct { res chan webRTCDeleteSessionRes } +type serverMetrics interface { + SetWebRTCServer(defs.APIWebRTCServer) +} + type serverPathManager interface { FindPathConf(req defs.PathFindPathConfReq) (*conf.Path, error) AddPublisher(req defs.PathAddPublisherReq) (defs.Path, error) @@ -192,6 +201,7 @@ type Server struct { TrackGatherTimeout conf.Duration STUNGatherTimeout conf.Duration ExternalCmdPool *externalcmd.Pool + Metrics serverMetrics PathManager serverPathManager Parent serverParent @@ -284,6 +294,10 @@ func (s *Server) Initialize() error { go s.run() + if !interfaceIsEmpty(s.Metrics) { + s.Metrics.SetWebRTCServer(s) + } + return nil } @@ -295,6 +309,11 @@ func (s *Server) Log(level logger.Level, format string, args ...interface{}) { // Close closes the server. func (s *Server) Close() { s.Log(logger.Info, "listener is closing") + + if !interfaceIsEmpty(s.Metrics) { + s.Metrics.SetWebRTCServer(nil) + } + s.ctxCancel() <-s.done }