diff --git a/internal/core/hls_manager.go b/internal/core/hls_manager.go index cd8111b1..a129387e 100644 --- a/internal/core/hls_manager.go +++ b/internal/core/hls_manager.go @@ -128,7 +128,7 @@ func newHLSManager( m.Log(logger.Info, "listener opened on "+address) - m.pathManager.hlsManagerSet(m) + m.pathManager.setHLSManager(m) if m.metrics != nil { m.metrics.setHLSManager(m) @@ -223,7 +223,7 @@ outer: m.httpServer.close() - m.pathManager.hlsManagerSet(nil) + m.pathManager.setHLSManager(nil) if m.metrics != nil { m.metrics.setHLSManager(nil) diff --git a/internal/core/hls_muxer.go b/internal/core/hls_muxer.go index 077e0b54..8c9439c2 100644 --- a/internal/core/hls_muxer.go +++ b/internal/core/hls_muxer.go @@ -241,7 +241,7 @@ func (m *hlsMuxer) clearQueuedRequests() { } func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) error { - res := m.pathManager.readerAdd(pathReaderAddReq{ + res := m.pathManager.addReader(pathAddReaderReq{ author: m, pathName: m.pathName, skipAuth: true, @@ -252,7 +252,7 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) m.path = res.path - defer m.path.readerRemove(pathReaderRemoveReq{author: m}) + defer m.path.removeReader(pathRemoveReaderReq{author: m}) m.ringBuffer, _ = ringbuffer.New(uint64(m.readBufferCount)) diff --git a/internal/core/path.go b/internal/core/path.go index 0cc0326a..ee2650c5 100644 --- a/internal/core/path.go +++ b/internal/core/path.go @@ -38,7 +38,7 @@ type pathParent interface { logger.Writer pathReady(*path) pathNotReady(*path) - onPathClose(*path) + closePath(*path) } type pathOnDemandState int @@ -65,12 +65,12 @@ type pathSourceStaticSetNotReadyReq struct { res chan struct{} } -type pathReaderRemoveReq struct { +type pathRemoveReaderReq struct { author reader res chan struct{} } -type pathPublisherRemoveReq struct { +type pathRemovePublisherReq struct { author publisher res chan struct{} } @@ -101,46 +101,46 @@ type pathDescribeReq struct { res chan pathDescribeRes } -type pathReaderSetupPlayRes struct { +type pathAddReaderRes struct { path *path stream *stream.Stream err error } -type pathReaderAddReq struct { +type pathAddReaderReq struct { author reader pathName string skipAuth bool credentials authCredentials - res chan pathReaderSetupPlayRes + res chan pathAddReaderRes } -type pathPublisherAddRes struct { +type pathAddPublisherRes struct { path *path err error } -type pathPublisherAddReq struct { +type pathAddPublisherReq struct { author publisher pathName string skipAuth bool credentials authCredentials - res chan pathPublisherAddRes + res chan pathAddPublisherRes } -type pathPublisherRecordRes struct { +type pathStartPublisherRes struct { stream *stream.Stream err error } -type pathPublisherStartReq struct { +type pathStartPublisherReq struct { author publisher medias media.Medias generateRTPPackets bool - res chan pathPublisherRecordRes + res chan pathStartPublisherRes } -type pathPublisherStopReq struct { +type pathStopPublisherReq struct { author publisher res chan struct{} } @@ -193,7 +193,7 @@ type path struct { bytesReceived *uint64 readers map[reader]struct{} describeRequestsOnHold []pathDescribeReq - readerAddRequestsOnHold []pathReaderAddReq + readerAddRequestsOnHold []pathAddReaderReq onDemandCmd *externalcmd.Cmd onReadyCmd *externalcmd.Cmd onDemandStaticSourceState pathOnDemandState @@ -208,12 +208,12 @@ type path struct { chSourceStaticSetReady chan pathSourceStaticSetReadyReq chSourceStaticSetNotReady chan pathSourceStaticSetNotReadyReq chDescribe chan pathDescribeReq - chPublisherRemove chan pathPublisherRemoveReq - chPublisherAdd chan pathPublisherAddReq - chPublisherStart chan pathPublisherStartReq - chPublisherStop chan pathPublisherStopReq - chReaderAdd chan pathReaderAddReq - chReaderRemove chan pathReaderRemoveReq + chRemovePublisher chan pathRemovePublisherReq + chAddPublisher chan pathAddPublisherReq + chStartPublisher chan pathStartPublisherReq + chStopPublisher chan pathStopPublisherReq + chAddReader chan pathAddReaderReq + chRemoveReader chan pathRemoveReaderReq chAPIPathsGet chan pathAPIPathsGetReq // out @@ -262,12 +262,12 @@ func newPath( chSourceStaticSetReady: make(chan pathSourceStaticSetReadyReq), chSourceStaticSetNotReady: make(chan pathSourceStaticSetNotReadyReq), chDescribe: make(chan pathDescribeReq), - chPublisherRemove: make(chan pathPublisherRemoveReq), - chPublisherAdd: make(chan pathPublisherAddReq), - chPublisherStart: make(chan pathPublisherStartReq), - chPublisherStop: make(chan pathPublisherStopReq), - chReaderAdd: make(chan pathReaderAddReq), - chReaderRemove: make(chan pathReaderRemoveReq), + chRemovePublisher: make(chan pathRemovePublisherReq), + chAddPublisher: make(chan pathAddPublisherReq), + chStartPublisher: make(chan pathStartPublisherReq), + chStopPublisher: make(chan pathStopPublisherReq), + chAddReader: make(chan pathAddReaderReq), + chRemoveReader: make(chan pathRemoveReaderReq), chAPIPathsGet: make(chan pathAPIPathsGetReq), done: make(chan struct{}), } @@ -341,7 +341,7 @@ func (pa *path) run() { pa.describeRequestsOnHold = nil for _, req := range pa.readerAddRequestsOnHold { - req.res <- pathReaderSetupPlayRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)} + req.res <- pathAddReaderRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)} } pa.readerAddRequestsOnHold = nil @@ -366,18 +366,18 @@ func (pa *path) run() { pa.describeRequestsOnHold = nil for _, req := range pa.readerAddRequestsOnHold { - req.res <- pathReaderSetupPlayRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)} + req.res <- pathAddReaderRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)} } pa.readerAddRequestsOnHold = nil - pa.onDemandPublisherStop() + pa.onDemandStopPublisher() if pa.shouldClose() { return fmt.Errorf("not in use") } case <-pa.onDemandPublisherCloseTimer.C: - pa.onDemandPublisherStop() + pa.onDemandStopPublisher() if pa.shouldClose() { return fmt.Errorf("not in use") @@ -411,7 +411,7 @@ func (pa *path) run() { pa.describeRequestsOnHold = nil for _, req := range pa.readerAddRequestsOnHold { - pa.handleReaderAddPost(req) + pa.handleAddReaderPost(req) } pa.readerAddRequestsOnHold = nil } @@ -441,35 +441,35 @@ func (pa *path) run() { return fmt.Errorf("not in use") } - case req := <-pa.chPublisherRemove: - pa.handlePublisherRemove(req) + case req := <-pa.chRemovePublisher: + pa.handleRemovePublisher(req) if pa.shouldClose() { return fmt.Errorf("not in use") } - case req := <-pa.chPublisherAdd: - pa.handlePublisherAdd(req) + case req := <-pa.chAddPublisher: + pa.handleAddPublisher(req) - case req := <-pa.chPublisherStart: - pa.handlePublisherStart(req) + case req := <-pa.chStartPublisher: + pa.handleStartPublisher(req) - case req := <-pa.chPublisherStop: - pa.handlePublisherStop(req) + case req := <-pa.chStopPublisher: + pa.handleStopPublisher(req) if pa.shouldClose() { return fmt.Errorf("not in use") } - case req := <-pa.chReaderAdd: - pa.handleReaderAdd(req) + case req := <-pa.chAddReader: + pa.handleAddReader(req) if pa.shouldClose() { return fmt.Errorf("not in use") } - case req := <-pa.chReaderRemove: - pa.handleReaderRemove(req) + case req := <-pa.chRemoveReader: + pa.handleRemoveReader(req) case req := <-pa.chAPIPathsGet: pa.handleAPIPathsGet(req) @@ -481,7 +481,7 @@ func (pa *path) run() { }() // call before destroying context - pa.parent.onPathClose(pa) + pa.parent.closePath(pa) pa.ctxCancel() @@ -500,7 +500,7 @@ func (pa *path) run() { } for _, req := range pa.readerAddRequestsOnHold { - req.res <- pathReaderSetupPlayRes{err: fmt.Errorf("terminated")} + req.res <- pathAddReaderRes{err: fmt.Errorf("terminated")} } if pa.stream != nil { @@ -575,7 +575,7 @@ func (pa *path) onDemandStaticSourceStop() { pa.source.(*sourceStatic).stop() } -func (pa *path) onDemandPublisherStart() { +func (pa *path) onDemandStartPublisher() { pa.Log(logger.Info, "runOnDemand command started") pa.onDemandCmd = externalcmd.NewCmd( pa.externalCmdPool, @@ -599,7 +599,7 @@ func (pa *path) onDemandPublisherScheduleClose() { pa.onDemandPublisherState = pathOnDemandStateClosing } -func (pa *path) onDemandPublisherStop() { +func (pa *path) onDemandStopPublisher() { if pa.source != nil { pa.source.(publisher).close() pa.doPublisherRemove() @@ -655,7 +655,7 @@ func (pa *path) setNotReady() { pa.parent.pathNotReady(pa) for r := range pa.readers { - pa.doReaderRemove(r) + pa.doRemoveReader(r) r.close() } @@ -671,7 +671,7 @@ func (pa *path) setNotReady() { } } -func (pa *path) doReaderRemove(r reader) { +func (pa *path) doRemoveReader(r reader) { delete(pa.readers, r) } @@ -708,7 +708,7 @@ func (pa *path) handleDescribe(req pathDescribeReq) { if pa.conf.HasOnDemandPublisher() { if pa.onDemandPublisherState == pathOnDemandStateInitial { - pa.onDemandPublisherStart() + pa.onDemandStartPublisher() } pa.describeRequestsOnHold = append(pa.describeRequestsOnHold, req) return @@ -734,16 +734,16 @@ func (pa *path) handleDescribe(req pathDescribeReq) { req.res <- pathDescribeRes{err: errPathNoOnePublishing{pathName: pa.name}} } -func (pa *path) handlePublisherRemove(req pathPublisherRemoveReq) { +func (pa *path) handleRemovePublisher(req pathRemovePublisherReq) { if pa.source == req.author { pa.doPublisherRemove() } close(req.res) } -func (pa *path) handlePublisherAdd(req pathPublisherAddReq) { +func (pa *path) handleAddPublisher(req pathAddPublisherReq) { if pa.conf.Source != "publisher" { - req.res <- pathPublisherAddRes{ + req.res <- pathAddPublisherRes{ err: fmt.Errorf("can't publish to path '%s' since 'source' is not 'publisher'", pa.name), } return @@ -751,7 +751,7 @@ func (pa *path) handlePublisherAdd(req pathPublisherAddReq) { if pa.source != nil { if pa.conf.DisablePublisherOverride { - req.res <- pathPublisherAddRes{err: fmt.Errorf("someone is already publishing to path '%s'", pa.name)} + req.res <- pathAddPublisherRes{err: fmt.Errorf("someone is already publishing to path '%s'", pa.name)} return } @@ -762,18 +762,18 @@ func (pa *path) handlePublisherAdd(req pathPublisherAddReq) { pa.source = req.author - req.res <- pathPublisherAddRes{path: pa} + req.res <- pathAddPublisherRes{path: pa} } -func (pa *path) handlePublisherStart(req pathPublisherStartReq) { +func (pa *path) handleStartPublisher(req pathStartPublisherReq) { if pa.source != req.author { - req.res <- pathPublisherRecordRes{err: fmt.Errorf("publisher is not assigned to this path anymore")} + req.res <- pathStartPublisherRes{err: fmt.Errorf("publisher is not assigned to this path anymore")} return } err := pa.setReady(req.medias, req.generateRTPPackets) if err != nil { - req.res <- pathPublisherRecordRes{err: err} + req.res <- pathStartPublisherRes{err: err} return } @@ -791,24 +791,24 @@ func (pa *path) handlePublisherStart(req pathPublisherStartReq) { pa.describeRequestsOnHold = nil for _, req := range pa.readerAddRequestsOnHold { - pa.handleReaderAddPost(req) + pa.handleAddReaderPost(req) } pa.readerAddRequestsOnHold = nil } - req.res <- pathPublisherRecordRes{stream: pa.stream} + req.res <- pathStartPublisherRes{stream: pa.stream} } -func (pa *path) handlePublisherStop(req pathPublisherStopReq) { +func (pa *path) handleStopPublisher(req pathStopPublisherReq) { if req.author == pa.source && pa.stream != nil { pa.setNotReady() } close(req.res) } -func (pa *path) handleReaderRemove(req pathReaderRemoveReq) { +func (pa *path) handleRemoveReader(req pathRemoveReaderReq) { if _, ok := pa.readers[req.author]; ok { - pa.doReaderRemove(req.author) + pa.doRemoveReader(req.author) } close(req.res) @@ -825,9 +825,9 @@ func (pa *path) handleReaderRemove(req pathReaderRemoveReq) { } } -func (pa *path) handleReaderAdd(req pathReaderAddReq) { +func (pa *path) handleAddReader(req pathAddReaderReq) { if pa.stream != nil { - pa.handleReaderAddPost(req) + pa.handleAddReaderPost(req) return } @@ -841,16 +841,16 @@ func (pa *path) handleReaderAdd(req pathReaderAddReq) { if pa.conf.HasOnDemandPublisher() { if pa.onDemandPublisherState == pathOnDemandStateInitial { - pa.onDemandPublisherStart() + pa.onDemandStartPublisher() } pa.readerAddRequestsOnHold = append(pa.readerAddRequestsOnHold, req) return } - req.res <- pathReaderSetupPlayRes{err: errPathNoOnePublishing{pathName: pa.name}} + req.res <- pathAddReaderRes{err: errPathNoOnePublishing{pathName: pa.name}} } -func (pa *path) handleReaderAddPost(req pathReaderAddReq) { +func (pa *path) handleAddReaderPost(req pathAddReaderReq) { pa.readers[req.author] = struct{}{} if pa.conf.HasOnDemandStaticSource() { @@ -867,7 +867,7 @@ func (pa *path) handleReaderAddPost(req pathReaderAddReq) { } } - req.res <- pathReaderSetupPlayRes{ + req.res <- pathAddReaderRes{ path: pa, stream: pa.stream, } @@ -962,62 +962,62 @@ func (pa *path) describe(req pathDescribeReq) pathDescribeRes { } } -// publisherRemove is called by a publisher. -func (pa *path) publisherRemove(req pathPublisherRemoveReq) { +// removePublisher is called by a publisher. +func (pa *path) removePublisher(req pathRemovePublisherReq) { req.res = make(chan struct{}) select { - case pa.chPublisherRemove <- req: + case pa.chRemovePublisher <- req: <-req.res case <-pa.ctx.Done(): } } -// publisherAdd is called by a publisher through pathManager. -func (pa *path) publisherAdd(req pathPublisherAddReq) pathPublisherAddRes { +// addPublisher is called by a publisher through pathManager. +func (pa *path) addPublisher(req pathAddPublisherReq) pathAddPublisherRes { select { - case pa.chPublisherAdd <- req: + case pa.chAddPublisher <- req: return <-req.res case <-pa.ctx.Done(): - return pathPublisherAddRes{err: fmt.Errorf("terminated")} + return pathAddPublisherRes{err: fmt.Errorf("terminated")} } } -// publisherStart is called by a publisher. -func (pa *path) publisherStart(req pathPublisherStartReq) pathPublisherRecordRes { - req.res = make(chan pathPublisherRecordRes) +// startPublisher is called by a publisher. +func (pa *path) startPublisher(req pathStartPublisherReq) pathStartPublisherRes { + req.res = make(chan pathStartPublisherRes) select { - case pa.chPublisherStart <- req: + case pa.chStartPublisher <- req: return <-req.res case <-pa.ctx.Done(): - return pathPublisherRecordRes{err: fmt.Errorf("terminated")} + return pathStartPublisherRes{err: fmt.Errorf("terminated")} } } -// publisherStop is called by a publisher. -func (pa *path) publisherStop(req pathPublisherStopReq) { +// stopPublisher is called by a publisher. +func (pa *path) stopPublisher(req pathStopPublisherReq) { req.res = make(chan struct{}) select { - case pa.chPublisherStop <- req: + case pa.chStopPublisher <- req: <-req.res case <-pa.ctx.Done(): } } -// readerAdd is called by a reader through pathManager. -func (pa *path) readerAdd(req pathReaderAddReq) pathReaderSetupPlayRes { +// addReader is called by a reader through pathManager. +func (pa *path) addReader(req pathAddReaderReq) pathAddReaderRes { select { - case pa.chReaderAdd <- req: + case pa.chAddReader <- req: return <-req.res case <-pa.ctx.Done(): - return pathReaderSetupPlayRes{err: fmt.Errorf("terminated")} + return pathAddReaderRes{err: fmt.Errorf("terminated")} } } -// readerRemove is called by a reader. -func (pa *path) readerRemove(req pathReaderRemoveReq) { +// removeReader is called by a reader. +func (pa *path) removeReader(req pathRemoveReaderReq) { req.res = make(chan struct{}) select { - case pa.chReaderRemove <- req: + case pa.chRemoveReader <- req: <-req.res case <-pa.ctx.Done(): } diff --git a/internal/core/path_manager.go b/internal/core/path_manager.go index 98829b85..da1e13f1 100644 --- a/internal/core/path_manager.go +++ b/internal/core/path_manager.go @@ -84,15 +84,15 @@ type pathManager struct { pathsByConf map[string]map[*path]struct{} // in - chConfReload chan map[string]*conf.PathConf - chPathClose chan *path + chReloadConf chan map[string]*conf.PathConf + chClosePath chan *path chPathReady chan *path chPathNotReady chan *path chGetConfForPath chan pathGetConfForPathReq chDescribe chan pathDescribeReq - chReaderAdd chan pathReaderAddReq - chPublisherAdd chan pathPublisherAddReq - chHLSManagerSet chan pathManagerHLSManager + chAddReader chan pathAddReaderReq + chAddPublisher chan pathAddPublisherReq + chSetHLSManager chan pathManagerHLSManager chAPIPathsList chan pathAPIPathsListReq chAPIPathsGet chan pathAPIPathsGetReq } @@ -128,15 +128,15 @@ func newPathManager( ctxCancel: ctxCancel, paths: make(map[string]*path), pathsByConf: make(map[string]map[*path]struct{}), - chConfReload: make(chan map[string]*conf.PathConf), - chPathClose: make(chan *path), + chReloadConf: make(chan map[string]*conf.PathConf), + chClosePath: make(chan *path), chPathReady: make(chan *path), chPathNotReady: make(chan *path), chGetConfForPath: make(chan pathGetConfForPathReq), chDescribe: make(chan pathDescribeReq), - chReaderAdd: make(chan pathReaderAddReq), - chPublisherAdd: make(chan pathPublisherAddReq), - chHLSManagerSet: make(chan pathManagerHLSManager), + chAddReader: make(chan pathAddReaderReq), + chAddPublisher: make(chan pathAddPublisherReq), + chSetHLSManager: make(chan pathManagerHLSManager), chAPIPathsList: make(chan pathAPIPathsListReq), chAPIPathsGet: make(chan pathAPIPathsGetReq), } @@ -176,7 +176,7 @@ func (pm *pathManager) run() { outer: for { select { - case newPathConfs := <-pm.chConfReload: + case newPathConfs := <-pm.chReloadConf: for confName, pathConf := range pm.pathConfs { if newPathConf, ok := newPathConfs[confName]; ok { // configuration has changed @@ -212,7 +212,7 @@ outer: } } - case pa := <-pm.chPathClose: + case pa := <-pm.chClosePath: if pmpa, ok := pm.paths[pa.name]; !ok || pmpa != pa { continue } @@ -264,17 +264,17 @@ outer: req.res <- pathDescribeRes{path: pm.paths[req.pathName]} - case req := <-pm.chReaderAdd: + case req := <-pm.chAddReader: pathConfName, pathConf, pathMatches, err := getConfForPath(pm.pathConfs, req.pathName) if err != nil { - req.res <- pathReaderSetupPlayRes{err: err} + req.res <- pathAddReaderRes{err: err} continue } if !req.skipAuth { err = doAuthentication(pm.externalAuthenticationURL, pm.authMethods, req.pathName, pathConf, false, req.credentials) if err != nil { - req.res <- pathReaderSetupPlayRes{err: err} + req.res <- pathAddReaderRes{err: err} continue } } @@ -284,19 +284,19 @@ outer: pm.createPath(pathConfName, pathConf, req.pathName, pathMatches) } - req.res <- pathReaderSetupPlayRes{path: pm.paths[req.pathName]} + req.res <- pathAddReaderRes{path: pm.paths[req.pathName]} - case req := <-pm.chPublisherAdd: + case req := <-pm.chAddPublisher: pathConfName, pathConf, pathMatches, err := getConfForPath(pm.pathConfs, req.pathName) if err != nil { - req.res <- pathPublisherAddRes{err: err} + req.res <- pathAddPublisherRes{err: err} continue } if !req.skipAuth { err = doAuthentication(pm.externalAuthenticationURL, pm.authMethods, req.pathName, pathConf, true, req.credentials) if err != nil { - req.res <- pathPublisherAddRes{err: err} + req.res <- pathAddPublisherRes{err: err} continue } } @@ -306,9 +306,9 @@ outer: pm.createPath(pathConfName, pathConf, req.pathName, pathMatches) } - req.res <- pathPublisherAddRes{path: pm.paths[req.pathName]} + req.res <- pathAddPublisherRes{path: pm.paths[req.pathName]} - case s := <-pm.chHLSManagerSet: + case s := <-pm.chSetHLSManager: pm.hlsManager = s case req := <-pm.chAPIPathsList: @@ -381,7 +381,7 @@ func (pm *pathManager) removePath(pa *path) { // confReload is called by core. func (pm *pathManager) confReload(pathConfs map[string]*conf.PathConf) { select { - case pm.chConfReload <- pathConfs: + case pm.chReloadConf <- pathConfs: case <-pm.ctx.Done(): } } @@ -404,10 +404,10 @@ func (pm *pathManager) pathNotReady(pa *path) { } } -// onPathClose is called by path. -func (pm *pathManager) onPathClose(pa *path) { +// closePath is called by path. +func (pm *pathManager) closePath(pa *path) { select { - case pm.chPathClose <- pa: + case pm.chClosePath <- pa: case <-pm.ctx.Done(): case <-pa.ctx.Done(): // in case pathManager is blocked by path.wait() } @@ -448,44 +448,44 @@ func (pm *pathManager) describe(req pathDescribeReq) pathDescribeRes { } } -// publisherAnnounce is called by a publisher. -func (pm *pathManager) publisherAdd(req pathPublisherAddReq) pathPublisherAddRes { - req.res = make(chan pathPublisherAddRes) +// addPublisher is called by a publisher. +func (pm *pathManager) addPublisher(req pathAddPublisherReq) pathAddPublisherRes { + req.res = make(chan pathAddPublisherRes) select { - case pm.chPublisherAdd <- req: + case pm.chAddPublisher <- req: res := <-req.res if res.err != nil { return res } - return res.path.publisherAdd(req) + return res.path.addPublisher(req) case <-pm.ctx.Done(): - return pathPublisherAddRes{err: fmt.Errorf("terminated")} + return pathAddPublisherRes{err: fmt.Errorf("terminated")} } } -// readerSetupPlay is called by a reader. -func (pm *pathManager) readerAdd(req pathReaderAddReq) pathReaderSetupPlayRes { - req.res = make(chan pathReaderSetupPlayRes) +// addReader is called by a reader. +func (pm *pathManager) addReader(req pathAddReaderReq) pathAddReaderRes { + req.res = make(chan pathAddReaderRes) select { - case pm.chReaderAdd <- req: + case pm.chAddReader <- req: res := <-req.res if res.err != nil { return res } - return res.path.readerAdd(req) + return res.path.addReader(req) case <-pm.ctx.Done(): - return pathReaderSetupPlayRes{err: fmt.Errorf("terminated")} + return pathAddReaderRes{err: fmt.Errorf("terminated")} } } -// hlsManagerSet is called by hlsManager. -func (pm *pathManager) hlsManagerSet(s pathManagerHLSManager) { +// setHLSManager is called by hlsManager. +func (pm *pathManager) setHLSManager(s pathManagerHLSManager) { select { - case pm.chHLSManagerSet <- s: + case pm.chSetHLSManager <- s: case <-pm.ctx.Done(): } } diff --git a/internal/core/rtmp_conn.go b/internal/core/rtmp_conn.go index 410e0758..f3739553 100644 --- a/internal/core/rtmp_conn.go +++ b/internal/core/rtmp_conn.go @@ -47,8 +47,8 @@ const ( ) type rtmpConnPathManager interface { - readerAdd(req pathReaderAddReq) pathReaderSetupPlayRes - publisherAdd(req pathPublisherAddReq) pathPublisherAddRes + addReader(req pathAddReaderReq) pathAddReaderRes + addPublisher(req pathAddPublisherReq) pathAddPublisherRes } type rtmpConnParent interface { @@ -208,7 +208,7 @@ func (c *rtmpConn) runReader() error { func (c *rtmpConn) runRead(u *url.URL) error { pathName, query, rawQuery := pathNameAndQuery(u) - res := c.pathManager.readerAdd(pathReaderAddReq{ + res := c.pathManager.addReader(pathAddReaderReq{ author: c, pathName: pathName, credentials: authCredentials{ @@ -230,7 +230,7 @@ func (c *rtmpConn) runRead(u *url.URL) error { return res.err } - defer res.path.readerRemove(pathReaderRemoveReq{author: c}) + defer res.path.removeReader(pathRemoveReaderReq{author: c}) c.mutex.Lock() c.state = rtmpConnStateRead @@ -572,7 +572,7 @@ func (c *rtmpConn) setupAudio( func (c *rtmpConn) runPublish(u *url.URL) error { pathName, query, rawQuery := pathNameAndQuery(u) - res := c.pathManager.publisherAdd(pathPublisherAddReq{ + res := c.pathManager.addPublisher(pathAddPublisherReq{ author: c, pathName: pathName, credentials: authCredentials{ @@ -594,7 +594,7 @@ func (c *rtmpConn) runPublish(u *url.URL) error { return res.err } - defer res.path.publisherRemove(pathPublisherRemoveReq{author: c}) + defer res.path.removePublisher(pathRemovePublisherReq{author: c}) c.mutex.Lock() c.state = rtmpConnStatePublish @@ -685,7 +685,7 @@ func (c *rtmpConn) runPublish(u *url.URL) error { } } - rres := res.path.publisherStart(pathPublisherStartReq{ + rres := res.path.startPublisher(pathStartPublisherReq{ author: c, medias: medias, generateRTPPackets: true, diff --git a/internal/core/rtsp_session.go b/internal/core/rtsp_session.go index 6bd9345a..fc537ca6 100644 --- a/internal/core/rtsp_session.go +++ b/internal/core/rtsp_session.go @@ -21,8 +21,8 @@ import ( ) type rtspSessionPathManager interface { - publisherAdd(req pathPublisherAddReq) pathPublisherAddRes - readerAdd(req pathReaderAddReq) pathReaderSetupPlayRes + addPublisher(req pathAddPublisherReq) pathAddPublisherRes + addReader(req pathAddReaderReq) pathAddReaderRes } type rtspSessionParent interface { @@ -100,10 +100,10 @@ func (s *rtspSession) onClose(err error) { switch s.session.State() { case gortsplib.ServerSessionStatePrePlay, gortsplib.ServerSessionStatePlay: - s.path.readerRemove(pathReaderRemoveReq{author: s}) + s.path.removeReader(pathRemoveReaderReq{author: s}) case gortsplib.ServerSessionStatePreRecord, gortsplib.ServerSessionStateRecord: - s.path.publisherRemove(pathPublisherRemoveReq{author: s}) + s.path.removePublisher(pathRemovePublisherReq{author: s}) } s.path = nil @@ -131,7 +131,7 @@ func (s *rtspSession) onAnnounce(c *rtspConn, ctx *gortsplib.ServerHandlerOnAnno } } - res := s.pathManager.publisherAdd(pathPublisherAddReq{ + res := s.pathManager.addPublisher(pathAddPublisherReq{ author: s, pathName: ctx.Path, credentials: authCredentials{ @@ -216,7 +216,7 @@ func (s *rtspSession) onSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCt } } - res := s.pathManager.readerAdd(pathReaderAddReq{ + res := s.pathManager.addReader(pathAddReaderReq{ author: s, pathName: ctx.Path, credentials: authCredentials{ @@ -304,7 +304,7 @@ func (s *rtspSession) onPlay(_ *gortsplib.ServerHandlerOnPlayCtx) (*base.Respons // onRecord is called by rtspServer. func (s *rtspSession) onRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.Response, error) { - res := s.path.publisherStart(pathPublisherStartReq{ + res := s.path.startPublisher(pathStartPublisherReq{ author: s, medias: s.session.AnnouncedMedias(), generateRTPPackets: false, @@ -356,7 +356,7 @@ func (s *rtspSession) onPause(_ *gortsplib.ServerHandlerOnPauseCtx) (*base.Respo s.mutex.Unlock() case gortsplib.ServerSessionStateRecord: - s.path.publisherStop(pathPublisherStopReq{author: s}) + s.path.stopPublisher(pathStopPublisherReq{author: s}) s.mutex.Lock() s.state = gortsplib.ServerSessionStatePreRecord diff --git a/internal/core/webrtc_http_server.go b/internal/core/webrtc_http_server.go index 22086c65..b6332a5d 100644 --- a/internal/core/webrtc_http_server.go +++ b/internal/core/webrtc_http_server.go @@ -162,8 +162,8 @@ func marshalICEFragment(offer *webrtc.SessionDescription, candidates []*webrtc.I type webRTCHTTPServerParent interface { logger.Writer generateICEServers() ([]webrtc.ICEServer, error) - sessionNew(req webRTCSessionNewReq) webRTCSessionNewRes - sessionAddCandidates(req webRTCSessionAddCandidatesReq) webRTCSessionAddCandidatesRes + newSession(req webRTCNewSessionReq) webRTCNewSessionRes + addSessionCandidates(req webRTCAddSessionCandidatesReq) webRTCAddSessionCandidatesRes } type webRTCHTTPServer struct { @@ -372,7 +372,7 @@ func (s *webRTCHTTPServer) onRequest(ctx *gin.Context) { return } - res := s.parent.sessionNew(webRTCSessionNewReq{ + res := s.parent.newSession(webRTCNewSessionReq{ pathName: dir, remoteAddr: remoteAddr, query: ctx.Request.URL.RawQuery, @@ -425,7 +425,7 @@ func (s *webRTCHTTPServer) onRequest(ctx *gin.Context) { return } - res := s.parent.sessionAddCandidates(webRTCSessionAddCandidatesReq{ + res := s.parent.addSessionCandidates(webRTCAddSessionCandidatesReq{ secret: secret, candidates: candidates, }) diff --git a/internal/core/webrtc_incoming_track.go b/internal/core/webrtc_incoming_track.go index 86002609..b4613dc4 100644 --- a/internal/core/webrtc_incoming_track.go +++ b/internal/core/webrtc_incoming_track.go @@ -7,9 +7,10 @@ import ( "github.com/bluenviron/gortsplib/v3/pkg/formats" "github.com/bluenviron/gortsplib/v3/pkg/media" - "github.com/bluenviron/mediamtx/internal/stream" "github.com/pion/rtcp" "github.com/pion/webrtc/v3" + + "github.com/bluenviron/mediamtx/internal/stream" ) const ( diff --git a/internal/core/webrtc_manager.go b/internal/core/webrtc_manager.go index 34ccf2ff..aebaf512 100644 --- a/internal/core/webrtc_manager.go +++ b/internal/core/webrtc_manager.go @@ -112,14 +112,14 @@ type webRTCManagerAPISessionsGetReq struct { res chan webRTCManagerAPISessionsGetRes } -type webRTCSessionNewRes struct { +type webRTCNewSessionRes struct { sx *webRTCSession answer []byte err error errStatusCode int } -type webRTCSessionNewReq struct { +type webRTCNewSessionReq struct { pathName string remoteAddr string query string @@ -127,18 +127,18 @@ type webRTCSessionNewReq struct { pass string offer []byte publish bool - res chan webRTCSessionNewRes + res chan webRTCNewSessionRes } -type webRTCSessionAddCandidatesRes struct { +type webRTCAddSessionCandidatesRes struct { sx *webRTCSession err error } -type webRTCSessionAddCandidatesReq struct { +type webRTCAddSessionCandidatesReq struct { secret uuid.UUID candidates []*webrtc.ICECandidateInit - res chan webRTCSessionAddCandidatesRes + res chan webRTCAddSessionCandidatesRes } type webRTCManagerParent interface { @@ -166,9 +166,9 @@ type webRTCManager struct { iceTCPMux ice.TCPMux // in - chSessionNew chan webRTCSessionNewReq - chSessionClose chan *webRTCSession - chSessionAddCandidates chan webRTCSessionAddCandidatesReq + chNewSession chan webRTCNewSessionReq + chCloseSession chan *webRTCSession + chAddSessionCandidates chan webRTCAddSessionCandidatesReq chAPISessionsList chan webRTCManagerAPISessionsListReq chAPISessionsGet chan webRTCManagerAPISessionsGetReq chAPIConnsKick chan webRTCManagerAPISessionsKickReq @@ -209,9 +209,9 @@ func newWebRTCManager( iceHostNAT1To1IPs: iceHostNAT1To1IPs, sessions: make(map[*webRTCSession]struct{}), sessionsBySecret: make(map[uuid.UUID]*webRTCSession), - chSessionNew: make(chan webRTCSessionNewReq), - chSessionClose: make(chan *webRTCSession), - chSessionAddCandidates: make(chan webRTCSessionAddCandidatesReq), + chNewSession: make(chan webRTCNewSessionReq), + chCloseSession: make(chan *webRTCSession), + chAddSessionCandidates: make(chan webRTCAddSessionCandidatesReq), chAPISessionsList: make(chan webRTCManagerAPISessionsListReq), chAPISessionsGet: make(chan webRTCManagerAPISessionsGetReq), chAPIConnsKick: make(chan webRTCManagerAPISessionsKickReq), @@ -293,7 +293,7 @@ func (m *webRTCManager) run() { outer: for { select { - case req := <-m.chSessionNew: + case req := <-m.chNewSession: sx := newWebRTCSession( m.ctx, m.readBufferCount, @@ -307,20 +307,20 @@ outer: ) m.sessions[sx] = struct{}{} m.sessionsBySecret[sx.secret] = sx - req.res <- webRTCSessionNewRes{sx: sx} + req.res <- webRTCNewSessionRes{sx: sx} - case sx := <-m.chSessionClose: + case sx := <-m.chCloseSession: delete(m.sessions, sx) delete(m.sessionsBySecret, sx.secret) - case req := <-m.chSessionAddCandidates: + case req := <-m.chAddSessionCandidates: sx, ok := m.sessionsBySecret[req.secret] if !ok { - req.res <- webRTCSessionAddCandidatesRes{err: fmt.Errorf("session not found")} + req.res <- webRTCAddSessionCandidatesRes{err: fmt.Errorf("session not found")} continue } - req.res <- webRTCSessionAddCandidatesRes{sx: sx} + req.res <- webRTCAddSessionCandidatesRes{sx: sx} case req := <-m.chAPISessionsList: data := &apiWebRTCSessionsList{ @@ -417,36 +417,36 @@ func (m *webRTCManager) generateICEServers() ([]webrtc.ICEServer, error) { return ret, nil } -// sessionNew is called by webRTCHTTPServer. -func (m *webRTCManager) sessionNew(req webRTCSessionNewReq) webRTCSessionNewRes { - req.res = make(chan webRTCSessionNewRes) +// newSession is called by webRTCHTTPServer. +func (m *webRTCManager) newSession(req webRTCNewSessionReq) webRTCNewSessionRes { + req.res = make(chan webRTCNewSessionRes) select { - case m.chSessionNew <- req: + case m.chNewSession <- req: res := <-req.res return res.sx.new(req) case <-m.ctx.Done(): - return webRTCSessionNewRes{err: fmt.Errorf("terminated"), errStatusCode: http.StatusInternalServerError} + return webRTCNewSessionRes{err: fmt.Errorf("terminated"), errStatusCode: http.StatusInternalServerError} } } -// sessionClose is called by webRTCSession. -func (m *webRTCManager) sessionClose(sx *webRTCSession) { +// closeSession is called by webRTCSession. +func (m *webRTCManager) closeSession(sx *webRTCSession) { select { - case m.chSessionClose <- sx: + case m.chCloseSession <- sx: case <-m.ctx.Done(): } } -// sessionAddCandidates is called by webRTCHTTPServer. -func (m *webRTCManager) sessionAddCandidates( - req webRTCSessionAddCandidatesReq, -) webRTCSessionAddCandidatesRes { - req.res = make(chan webRTCSessionAddCandidatesRes) +// addSessionCandidates is called by webRTCHTTPServer. +func (m *webRTCManager) addSessionCandidates( + req webRTCAddSessionCandidatesReq, +) webRTCAddSessionCandidatesRes { + req.res = make(chan webRTCAddSessionCandidatesRes) select { - case m.chSessionAddCandidates <- req: + case m.chAddSessionCandidates <- req: res1 := <-req.res if res1.err != nil { return res1 @@ -455,7 +455,7 @@ func (m *webRTCManager) sessionAddCandidates( return res1.sx.addCandidates(req) case <-m.ctx.Done(): - return webRTCSessionAddCandidatesRes{err: fmt.Errorf("terminated")} + return webRTCAddSessionCandidatesRes{err: fmt.Errorf("terminated")} } } diff --git a/internal/core/webrtc_session.go b/internal/core/webrtc_session.go index 93592e2b..8f6859c5 100644 --- a/internal/core/webrtc_session.go +++ b/internal/core/webrtc_session.go @@ -131,13 +131,13 @@ func gatherIncomingTracks( } type webRTCSessionPathManager interface { - publisherAdd(req pathPublisherAddReq) pathPublisherAddRes - readerAdd(req pathReaderAddReq) pathReaderSetupPlayRes + addPublisher(req pathAddPublisherReq) pathAddPublisherRes + addReader(req pathAddReaderReq) pathAddReaderRes } type webRTCSession struct { readBufferCount int - req webRTCSessionNewReq + req webRTCNewSessionReq wg *sync.WaitGroup iceHostNAT1To1IPs []string iceUDPMux ice.UDPMux @@ -145,23 +145,22 @@ type webRTCSession struct { pathManager webRTCSessionPathManager parent *webRTCManager - ctx context.Context - ctxCancel func() - created time.Time - uuid uuid.UUID - secret uuid.UUID - answerSent bool - mutex sync.RWMutex - pc *peerConnection + ctx context.Context + ctxCancel func() + created time.Time + uuid uuid.UUID + secret uuid.UUID + mutex sync.RWMutex + pc *peerConnection - chNew chan webRTCSessionNewReq - chAddCandidates chan webRTCSessionAddCandidatesReq + chNew chan webRTCNewSessionReq + chAddCandidates chan webRTCAddSessionCandidatesReq } func newWebRTCSession( parentCtx context.Context, readBufferCount int, - req webRTCSessionNewReq, + req webRTCNewSessionReq, wg *sync.WaitGroup, iceHostNAT1To1IPs []string, iceUDPMux ice.UDPMux, @@ -185,8 +184,8 @@ func newWebRTCSession( created: time.Now(), uuid: uuid.New(), secret: uuid.New(), - chNew: make(chan webRTCSessionNewReq), - chAddCandidates: make(chan webRTCSessionAddCandidatesReq), + chNew: make(chan webRTCNewSessionReq), + chAddCandidates: make(chan webRTCAddSessionCandidatesReq), } s.Log(logger.Info, "created by %s", req.remoteAddr) @@ -213,7 +212,7 @@ func (s *webRTCSession) run() { s.ctxCancel() - s.parent.sessionClose(s) + s.parent.closeSession(s) s.Log(logger.Info, "closed (%v)", err) } @@ -221,16 +220,14 @@ func (s *webRTCSession) run() { func (s *webRTCSession) runInner() error { select { case <-s.chNew: - // do not store the request, we already have it - case <-s.ctx.Done(): return fmt.Errorf("terminated") } errStatusCode, err := s.runInner2() - if !s.answerSent { - s.req.res <- webRTCSessionNewRes{ + if errStatusCode != 0 { + s.req.res <- webRTCNewSessionRes{ err: err, errStatusCode: errStatusCode, } @@ -249,7 +246,7 @@ func (s *webRTCSession) runInner2() (int, error) { func (s *webRTCSession) runPublish() (int, error) { ip, _, _ := net.SplitHostPort(s.req.remoteAddr) - res := s.pathManager.publisherAdd(pathPublisherAddReq{ + res := s.pathManager.addPublisher(pathAddPublisherReq{ author: s, pathName: s.req.pathName, credentials: authCredentials{ @@ -272,7 +269,7 @@ func (s *webRTCSession) runPublish() (int, error) { return http.StatusBadRequest, res.err } - defer res.path.publisherRemove(pathPublisherRemoveReq{author: s}) + defer res.path.removePublisher(pathRemovePublisherReq{author: s}) servers, err := s.parent.generateICEServers() if err != nil { @@ -388,7 +385,7 @@ func (s *webRTCSession) runPublish() (int, error) { } medias := mediasOfIncomingTracks(tracks) - rres := res.path.publisherStart(pathPublisherStartReq{ + rres := res.path.startPublisher(pathStartPublisherReq{ author: s, medias: medias, generateRTPPackets: false, @@ -417,7 +414,7 @@ func (s *webRTCSession) runPublish() (int, error) { func (s *webRTCSession) runRead() (int, error) { ip, _, _ := net.SplitHostPort(s.req.remoteAddr) - res := s.pathManager.readerAdd(pathReaderAddReq{ + res := s.pathManager.addReader(pathAddReaderReq{ author: s, pathName: s.req.pathName, credentials: authCredentials{ @@ -444,7 +441,7 @@ func (s *webRTCSession) runRead() (int, error) { return http.StatusBadRequest, res.err } - defer res.path.readerRemove(pathReaderRemoveReq{author: s}) + defer res.path.removeReader(pathRemoveReaderReq{author: s}) tracks, err := gatherOutgoingTracks(res.stream.Medias()) if err != nil { @@ -569,11 +566,10 @@ func (s *webRTCSession) waitGatheringDone(pc *peerConnection) error { } func (s *webRTCSession) writeAnswer(answer *webrtc.SessionDescription) { - s.req.res <- webRTCSessionNewRes{ + s.req.res <- webRTCNewSessionRes{ sx: s, answer: []byte(answer.SDP), } - s.answerSent = true } func (s *webRTCSession) readRemoteCandidates(pc *peerConnection) { @@ -583,10 +579,10 @@ func (s *webRTCSession) readRemoteCandidates(pc *peerConnection) { for _, candidate := range req.candidates { err := pc.AddICECandidate(*candidate) if err != nil { - req.res <- webRTCSessionAddCandidatesRes{err: err} + req.res <- webRTCAddSessionCandidatesRes{err: err} } } - req.res <- webRTCSessionAddCandidatesRes{} + req.res <- webRTCAddSessionCandidatesRes{} case <-s.ctx.Done(): return @@ -595,26 +591,26 @@ func (s *webRTCSession) readRemoteCandidates(pc *peerConnection) { } // new is called by webRTCHTTPServer through webRTCManager. -func (s *webRTCSession) new(req webRTCSessionNewReq) webRTCSessionNewRes { +func (s *webRTCSession) new(req webRTCNewSessionReq) webRTCNewSessionRes { select { case s.chNew <- req: return <-req.res case <-s.ctx.Done(): - return webRTCSessionNewRes{err: fmt.Errorf("terminated"), errStatusCode: http.StatusInternalServerError} + return webRTCNewSessionRes{err: fmt.Errorf("terminated"), errStatusCode: http.StatusInternalServerError} } } // addCandidates is called by webRTCHTTPServer through webRTCManager. func (s *webRTCSession) addCandidates( - req webRTCSessionAddCandidatesReq, -) webRTCSessionAddCandidatesRes { + req webRTCAddSessionCandidatesReq, +) webRTCAddSessionCandidatesRes { select { case s.chAddCandidates <- req: return <-req.res case <-s.ctx.Done(): - return webRTCSessionAddCandidatesRes{err: fmt.Errorf("terminated")} + return webRTCAddSessionCandidatesRes{err: fmt.Errorf("terminated")} } }