normalize channels and methods (#2127)
Some checks reported warnings
lint / code (push) Has been cancelled
lint / mod-tidy (push) Has been cancelled
lint / apidocs (push) Has been cancelled
test / test64 (push) Has been cancelled
test / test32 (push) Has been cancelled
test / test_highlevel (push) Has been cancelled

needed by #2068
This commit is contained in:
Alessandro Ros
2023-07-30 23:53:39 +02:00
committed by GitHub
parent e3d4856b4f
commit 72b1d233df
10 changed files with 220 additions and 223 deletions

View File

@@ -128,7 +128,7 @@ func newHLSManager(
m.Log(logger.Info, "listener opened on "+address) m.Log(logger.Info, "listener opened on "+address)
m.pathManager.hlsManagerSet(m) m.pathManager.setHLSManager(m)
if m.metrics != nil { if m.metrics != nil {
m.metrics.setHLSManager(m) m.metrics.setHLSManager(m)
@@ -223,7 +223,7 @@ outer:
m.httpServer.close() m.httpServer.close()
m.pathManager.hlsManagerSet(nil) m.pathManager.setHLSManager(nil)
if m.metrics != nil { if m.metrics != nil {
m.metrics.setHLSManager(nil) m.metrics.setHLSManager(nil)

View File

@@ -241,7 +241,7 @@ func (m *hlsMuxer) clearQueuedRequests() {
} }
func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) error { func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) error {
res := m.pathManager.readerAdd(pathReaderAddReq{ res := m.pathManager.addReader(pathAddReaderReq{
author: m, author: m,
pathName: m.pathName, pathName: m.pathName,
skipAuth: true, skipAuth: true,
@@ -252,7 +252,7 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{})
m.path = res.path m.path = res.path
defer m.path.readerRemove(pathReaderRemoveReq{author: m}) defer m.path.removeReader(pathRemoveReaderReq{author: m})
m.ringBuffer, _ = ringbuffer.New(uint64(m.readBufferCount)) m.ringBuffer, _ = ringbuffer.New(uint64(m.readBufferCount))

View File

@@ -38,7 +38,7 @@ type pathParent interface {
logger.Writer logger.Writer
pathReady(*path) pathReady(*path)
pathNotReady(*path) pathNotReady(*path)
onPathClose(*path) closePath(*path)
} }
type pathOnDemandState int type pathOnDemandState int
@@ -65,12 +65,12 @@ type pathSourceStaticSetNotReadyReq struct {
res chan struct{} res chan struct{}
} }
type pathReaderRemoveReq struct { type pathRemoveReaderReq struct {
author reader author reader
res chan struct{} res chan struct{}
} }
type pathPublisherRemoveReq struct { type pathRemovePublisherReq struct {
author publisher author publisher
res chan struct{} res chan struct{}
} }
@@ -101,46 +101,46 @@ type pathDescribeReq struct {
res chan pathDescribeRes res chan pathDescribeRes
} }
type pathReaderSetupPlayRes struct { type pathAddReaderRes struct {
path *path path *path
stream *stream.Stream stream *stream.Stream
err error err error
} }
type pathReaderAddReq struct { type pathAddReaderReq struct {
author reader author reader
pathName string pathName string
skipAuth bool skipAuth bool
credentials authCredentials credentials authCredentials
res chan pathReaderSetupPlayRes res chan pathAddReaderRes
} }
type pathPublisherAddRes struct { type pathAddPublisherRes struct {
path *path path *path
err error err error
} }
type pathPublisherAddReq struct { type pathAddPublisherReq struct {
author publisher author publisher
pathName string pathName string
skipAuth bool skipAuth bool
credentials authCredentials credentials authCredentials
res chan pathPublisherAddRes res chan pathAddPublisherRes
} }
type pathPublisherRecordRes struct { type pathStartPublisherRes struct {
stream *stream.Stream stream *stream.Stream
err error err error
} }
type pathPublisherStartReq struct { type pathStartPublisherReq struct {
author publisher author publisher
medias media.Medias medias media.Medias
generateRTPPackets bool generateRTPPackets bool
res chan pathPublisherRecordRes res chan pathStartPublisherRes
} }
type pathPublisherStopReq struct { type pathStopPublisherReq struct {
author publisher author publisher
res chan struct{} res chan struct{}
} }
@@ -193,7 +193,7 @@ type path struct {
bytesReceived *uint64 bytesReceived *uint64
readers map[reader]struct{} readers map[reader]struct{}
describeRequestsOnHold []pathDescribeReq describeRequestsOnHold []pathDescribeReq
readerAddRequestsOnHold []pathReaderAddReq readerAddRequestsOnHold []pathAddReaderReq
onDemandCmd *externalcmd.Cmd onDemandCmd *externalcmd.Cmd
onReadyCmd *externalcmd.Cmd onReadyCmd *externalcmd.Cmd
onDemandStaticSourceState pathOnDemandState onDemandStaticSourceState pathOnDemandState
@@ -208,12 +208,12 @@ type path struct {
chSourceStaticSetReady chan pathSourceStaticSetReadyReq chSourceStaticSetReady chan pathSourceStaticSetReadyReq
chSourceStaticSetNotReady chan pathSourceStaticSetNotReadyReq chSourceStaticSetNotReady chan pathSourceStaticSetNotReadyReq
chDescribe chan pathDescribeReq chDescribe chan pathDescribeReq
chPublisherRemove chan pathPublisherRemoveReq chRemovePublisher chan pathRemovePublisherReq
chPublisherAdd chan pathPublisherAddReq chAddPublisher chan pathAddPublisherReq
chPublisherStart chan pathPublisherStartReq chStartPublisher chan pathStartPublisherReq
chPublisherStop chan pathPublisherStopReq chStopPublisher chan pathStopPublisherReq
chReaderAdd chan pathReaderAddReq chAddReader chan pathAddReaderReq
chReaderRemove chan pathReaderRemoveReq chRemoveReader chan pathRemoveReaderReq
chAPIPathsGet chan pathAPIPathsGetReq chAPIPathsGet chan pathAPIPathsGetReq
// out // out
@@ -262,12 +262,12 @@ func newPath(
chSourceStaticSetReady: make(chan pathSourceStaticSetReadyReq), chSourceStaticSetReady: make(chan pathSourceStaticSetReadyReq),
chSourceStaticSetNotReady: make(chan pathSourceStaticSetNotReadyReq), chSourceStaticSetNotReady: make(chan pathSourceStaticSetNotReadyReq),
chDescribe: make(chan pathDescribeReq), chDescribe: make(chan pathDescribeReq),
chPublisherRemove: make(chan pathPublisherRemoveReq), chRemovePublisher: make(chan pathRemovePublisherReq),
chPublisherAdd: make(chan pathPublisherAddReq), chAddPublisher: make(chan pathAddPublisherReq),
chPublisherStart: make(chan pathPublisherStartReq), chStartPublisher: make(chan pathStartPublisherReq),
chPublisherStop: make(chan pathPublisherStopReq), chStopPublisher: make(chan pathStopPublisherReq),
chReaderAdd: make(chan pathReaderAddReq), chAddReader: make(chan pathAddReaderReq),
chReaderRemove: make(chan pathReaderRemoveReq), chRemoveReader: make(chan pathRemoveReaderReq),
chAPIPathsGet: make(chan pathAPIPathsGetReq), chAPIPathsGet: make(chan pathAPIPathsGetReq),
done: make(chan struct{}), done: make(chan struct{}),
} }
@@ -341,7 +341,7 @@ func (pa *path) run() {
pa.describeRequestsOnHold = nil pa.describeRequestsOnHold = nil
for _, req := range pa.readerAddRequestsOnHold { for _, req := range pa.readerAddRequestsOnHold {
req.res <- pathReaderSetupPlayRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)} req.res <- pathAddReaderRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)}
} }
pa.readerAddRequestsOnHold = nil pa.readerAddRequestsOnHold = nil
@@ -366,18 +366,18 @@ func (pa *path) run() {
pa.describeRequestsOnHold = nil pa.describeRequestsOnHold = nil
for _, req := range pa.readerAddRequestsOnHold { for _, req := range pa.readerAddRequestsOnHold {
req.res <- pathReaderSetupPlayRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)} req.res <- pathAddReaderRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)}
} }
pa.readerAddRequestsOnHold = nil pa.readerAddRequestsOnHold = nil
pa.onDemandPublisherStop() pa.onDemandStopPublisher()
if pa.shouldClose() { if pa.shouldClose() {
return fmt.Errorf("not in use") return fmt.Errorf("not in use")
} }
case <-pa.onDemandPublisherCloseTimer.C: case <-pa.onDemandPublisherCloseTimer.C:
pa.onDemandPublisherStop() pa.onDemandStopPublisher()
if pa.shouldClose() { if pa.shouldClose() {
return fmt.Errorf("not in use") return fmt.Errorf("not in use")
@@ -411,7 +411,7 @@ func (pa *path) run() {
pa.describeRequestsOnHold = nil pa.describeRequestsOnHold = nil
for _, req := range pa.readerAddRequestsOnHold { for _, req := range pa.readerAddRequestsOnHold {
pa.handleReaderAddPost(req) pa.handleAddReaderPost(req)
} }
pa.readerAddRequestsOnHold = nil pa.readerAddRequestsOnHold = nil
} }
@@ -441,35 +441,35 @@ func (pa *path) run() {
return fmt.Errorf("not in use") return fmt.Errorf("not in use")
} }
case req := <-pa.chPublisherRemove: case req := <-pa.chRemovePublisher:
pa.handlePublisherRemove(req) pa.handleRemovePublisher(req)
if pa.shouldClose() { if pa.shouldClose() {
return fmt.Errorf("not in use") return fmt.Errorf("not in use")
} }
case req := <-pa.chPublisherAdd: case req := <-pa.chAddPublisher:
pa.handlePublisherAdd(req) pa.handleAddPublisher(req)
case req := <-pa.chPublisherStart: case req := <-pa.chStartPublisher:
pa.handlePublisherStart(req) pa.handleStartPublisher(req)
case req := <-pa.chPublisherStop: case req := <-pa.chStopPublisher:
pa.handlePublisherStop(req) pa.handleStopPublisher(req)
if pa.shouldClose() { if pa.shouldClose() {
return fmt.Errorf("not in use") return fmt.Errorf("not in use")
} }
case req := <-pa.chReaderAdd: case req := <-pa.chAddReader:
pa.handleReaderAdd(req) pa.handleAddReader(req)
if pa.shouldClose() { if pa.shouldClose() {
return fmt.Errorf("not in use") return fmt.Errorf("not in use")
} }
case req := <-pa.chReaderRemove: case req := <-pa.chRemoveReader:
pa.handleReaderRemove(req) pa.handleRemoveReader(req)
case req := <-pa.chAPIPathsGet: case req := <-pa.chAPIPathsGet:
pa.handleAPIPathsGet(req) pa.handleAPIPathsGet(req)
@@ -481,7 +481,7 @@ func (pa *path) run() {
}() }()
// call before destroying context // call before destroying context
pa.parent.onPathClose(pa) pa.parent.closePath(pa)
pa.ctxCancel() pa.ctxCancel()
@@ -500,7 +500,7 @@ func (pa *path) run() {
} }
for _, req := range pa.readerAddRequestsOnHold { for _, req := range pa.readerAddRequestsOnHold {
req.res <- pathReaderSetupPlayRes{err: fmt.Errorf("terminated")} req.res <- pathAddReaderRes{err: fmt.Errorf("terminated")}
} }
if pa.stream != nil { if pa.stream != nil {
@@ -575,7 +575,7 @@ func (pa *path) onDemandStaticSourceStop() {
pa.source.(*sourceStatic).stop() pa.source.(*sourceStatic).stop()
} }
func (pa *path) onDemandPublisherStart() { func (pa *path) onDemandStartPublisher() {
pa.Log(logger.Info, "runOnDemand command started") pa.Log(logger.Info, "runOnDemand command started")
pa.onDemandCmd = externalcmd.NewCmd( pa.onDemandCmd = externalcmd.NewCmd(
pa.externalCmdPool, pa.externalCmdPool,
@@ -599,7 +599,7 @@ func (pa *path) onDemandPublisherScheduleClose() {
pa.onDemandPublisherState = pathOnDemandStateClosing pa.onDemandPublisherState = pathOnDemandStateClosing
} }
func (pa *path) onDemandPublisherStop() { func (pa *path) onDemandStopPublisher() {
if pa.source != nil { if pa.source != nil {
pa.source.(publisher).close() pa.source.(publisher).close()
pa.doPublisherRemove() pa.doPublisherRemove()
@@ -655,7 +655,7 @@ func (pa *path) setNotReady() {
pa.parent.pathNotReady(pa) pa.parent.pathNotReady(pa)
for r := range pa.readers { for r := range pa.readers {
pa.doReaderRemove(r) pa.doRemoveReader(r)
r.close() r.close()
} }
@@ -671,7 +671,7 @@ func (pa *path) setNotReady() {
} }
} }
func (pa *path) doReaderRemove(r reader) { func (pa *path) doRemoveReader(r reader) {
delete(pa.readers, r) delete(pa.readers, r)
} }
@@ -708,7 +708,7 @@ func (pa *path) handleDescribe(req pathDescribeReq) {
if pa.conf.HasOnDemandPublisher() { if pa.conf.HasOnDemandPublisher() {
if pa.onDemandPublisherState == pathOnDemandStateInitial { if pa.onDemandPublisherState == pathOnDemandStateInitial {
pa.onDemandPublisherStart() pa.onDemandStartPublisher()
} }
pa.describeRequestsOnHold = append(pa.describeRequestsOnHold, req) pa.describeRequestsOnHold = append(pa.describeRequestsOnHold, req)
return return
@@ -734,16 +734,16 @@ func (pa *path) handleDescribe(req pathDescribeReq) {
req.res <- pathDescribeRes{err: errPathNoOnePublishing{pathName: pa.name}} req.res <- pathDescribeRes{err: errPathNoOnePublishing{pathName: pa.name}}
} }
func (pa *path) handlePublisherRemove(req pathPublisherRemoveReq) { func (pa *path) handleRemovePublisher(req pathRemovePublisherReq) {
if pa.source == req.author { if pa.source == req.author {
pa.doPublisherRemove() pa.doPublisherRemove()
} }
close(req.res) close(req.res)
} }
func (pa *path) handlePublisherAdd(req pathPublisherAddReq) { func (pa *path) handleAddPublisher(req pathAddPublisherReq) {
if pa.conf.Source != "publisher" { if pa.conf.Source != "publisher" {
req.res <- pathPublisherAddRes{ req.res <- pathAddPublisherRes{
err: fmt.Errorf("can't publish to path '%s' since 'source' is not 'publisher'", pa.name), err: fmt.Errorf("can't publish to path '%s' since 'source' is not 'publisher'", pa.name),
} }
return return
@@ -751,7 +751,7 @@ func (pa *path) handlePublisherAdd(req pathPublisherAddReq) {
if pa.source != nil { if pa.source != nil {
if pa.conf.DisablePublisherOverride { if pa.conf.DisablePublisherOverride {
req.res <- pathPublisherAddRes{err: fmt.Errorf("someone is already publishing to path '%s'", pa.name)} req.res <- pathAddPublisherRes{err: fmt.Errorf("someone is already publishing to path '%s'", pa.name)}
return return
} }
@@ -762,18 +762,18 @@ func (pa *path) handlePublisherAdd(req pathPublisherAddReq) {
pa.source = req.author pa.source = req.author
req.res <- pathPublisherAddRes{path: pa} req.res <- pathAddPublisherRes{path: pa}
} }
func (pa *path) handlePublisherStart(req pathPublisherStartReq) { func (pa *path) handleStartPublisher(req pathStartPublisherReq) {
if pa.source != req.author { if pa.source != req.author {
req.res <- pathPublisherRecordRes{err: fmt.Errorf("publisher is not assigned to this path anymore")} req.res <- pathStartPublisherRes{err: fmt.Errorf("publisher is not assigned to this path anymore")}
return return
} }
err := pa.setReady(req.medias, req.generateRTPPackets) err := pa.setReady(req.medias, req.generateRTPPackets)
if err != nil { if err != nil {
req.res <- pathPublisherRecordRes{err: err} req.res <- pathStartPublisherRes{err: err}
return return
} }
@@ -791,24 +791,24 @@ func (pa *path) handlePublisherStart(req pathPublisherStartReq) {
pa.describeRequestsOnHold = nil pa.describeRequestsOnHold = nil
for _, req := range pa.readerAddRequestsOnHold { for _, req := range pa.readerAddRequestsOnHold {
pa.handleReaderAddPost(req) pa.handleAddReaderPost(req)
} }
pa.readerAddRequestsOnHold = nil pa.readerAddRequestsOnHold = nil
} }
req.res <- pathPublisherRecordRes{stream: pa.stream} req.res <- pathStartPublisherRes{stream: pa.stream}
} }
func (pa *path) handlePublisherStop(req pathPublisherStopReq) { func (pa *path) handleStopPublisher(req pathStopPublisherReq) {
if req.author == pa.source && pa.stream != nil { if req.author == pa.source && pa.stream != nil {
pa.setNotReady() pa.setNotReady()
} }
close(req.res) close(req.res)
} }
func (pa *path) handleReaderRemove(req pathReaderRemoveReq) { func (pa *path) handleRemoveReader(req pathRemoveReaderReq) {
if _, ok := pa.readers[req.author]; ok { if _, ok := pa.readers[req.author]; ok {
pa.doReaderRemove(req.author) pa.doRemoveReader(req.author)
} }
close(req.res) close(req.res)
@@ -825,9 +825,9 @@ func (pa *path) handleReaderRemove(req pathReaderRemoveReq) {
} }
} }
func (pa *path) handleReaderAdd(req pathReaderAddReq) { func (pa *path) handleAddReader(req pathAddReaderReq) {
if pa.stream != nil { if pa.stream != nil {
pa.handleReaderAddPost(req) pa.handleAddReaderPost(req)
return return
} }
@@ -841,16 +841,16 @@ func (pa *path) handleReaderAdd(req pathReaderAddReq) {
if pa.conf.HasOnDemandPublisher() { if pa.conf.HasOnDemandPublisher() {
if pa.onDemandPublisherState == pathOnDemandStateInitial { if pa.onDemandPublisherState == pathOnDemandStateInitial {
pa.onDemandPublisherStart() pa.onDemandStartPublisher()
} }
pa.readerAddRequestsOnHold = append(pa.readerAddRequestsOnHold, req) pa.readerAddRequestsOnHold = append(pa.readerAddRequestsOnHold, req)
return return
} }
req.res <- pathReaderSetupPlayRes{err: errPathNoOnePublishing{pathName: pa.name}} req.res <- pathAddReaderRes{err: errPathNoOnePublishing{pathName: pa.name}}
} }
func (pa *path) handleReaderAddPost(req pathReaderAddReq) { func (pa *path) handleAddReaderPost(req pathAddReaderReq) {
pa.readers[req.author] = struct{}{} pa.readers[req.author] = struct{}{}
if pa.conf.HasOnDemandStaticSource() { if pa.conf.HasOnDemandStaticSource() {
@@ -867,7 +867,7 @@ func (pa *path) handleReaderAddPost(req pathReaderAddReq) {
} }
} }
req.res <- pathReaderSetupPlayRes{ req.res <- pathAddReaderRes{
path: pa, path: pa,
stream: pa.stream, stream: pa.stream,
} }
@@ -962,62 +962,62 @@ func (pa *path) describe(req pathDescribeReq) pathDescribeRes {
} }
} }
// publisherRemove is called by a publisher. // removePublisher is called by a publisher.
func (pa *path) publisherRemove(req pathPublisherRemoveReq) { func (pa *path) removePublisher(req pathRemovePublisherReq) {
req.res = make(chan struct{}) req.res = make(chan struct{})
select { select {
case pa.chPublisherRemove <- req: case pa.chRemovePublisher <- req:
<-req.res <-req.res
case <-pa.ctx.Done(): case <-pa.ctx.Done():
} }
} }
// publisherAdd is called by a publisher through pathManager. // addPublisher is called by a publisher through pathManager.
func (pa *path) publisherAdd(req pathPublisherAddReq) pathPublisherAddRes { func (pa *path) addPublisher(req pathAddPublisherReq) pathAddPublisherRes {
select { select {
case pa.chPublisherAdd <- req: case pa.chAddPublisher <- req:
return <-req.res return <-req.res
case <-pa.ctx.Done(): case <-pa.ctx.Done():
return pathPublisherAddRes{err: fmt.Errorf("terminated")} return pathAddPublisherRes{err: fmt.Errorf("terminated")}
} }
} }
// publisherStart is called by a publisher. // startPublisher is called by a publisher.
func (pa *path) publisherStart(req pathPublisherStartReq) pathPublisherRecordRes { func (pa *path) startPublisher(req pathStartPublisherReq) pathStartPublisherRes {
req.res = make(chan pathPublisherRecordRes) req.res = make(chan pathStartPublisherRes)
select { select {
case pa.chPublisherStart <- req: case pa.chStartPublisher <- req:
return <-req.res return <-req.res
case <-pa.ctx.Done(): case <-pa.ctx.Done():
return pathPublisherRecordRes{err: fmt.Errorf("terminated")} return pathStartPublisherRes{err: fmt.Errorf("terminated")}
} }
} }
// publisherStop is called by a publisher. // stopPublisher is called by a publisher.
func (pa *path) publisherStop(req pathPublisherStopReq) { func (pa *path) stopPublisher(req pathStopPublisherReq) {
req.res = make(chan struct{}) req.res = make(chan struct{})
select { select {
case pa.chPublisherStop <- req: case pa.chStopPublisher <- req:
<-req.res <-req.res
case <-pa.ctx.Done(): case <-pa.ctx.Done():
} }
} }
// readerAdd is called by a reader through pathManager. // addReader is called by a reader through pathManager.
func (pa *path) readerAdd(req pathReaderAddReq) pathReaderSetupPlayRes { func (pa *path) addReader(req pathAddReaderReq) pathAddReaderRes {
select { select {
case pa.chReaderAdd <- req: case pa.chAddReader <- req:
return <-req.res return <-req.res
case <-pa.ctx.Done(): case <-pa.ctx.Done():
return pathReaderSetupPlayRes{err: fmt.Errorf("terminated")} return pathAddReaderRes{err: fmt.Errorf("terminated")}
} }
} }
// readerRemove is called by a reader. // removeReader is called by a reader.
func (pa *path) readerRemove(req pathReaderRemoveReq) { func (pa *path) removeReader(req pathRemoveReaderReq) {
req.res = make(chan struct{}) req.res = make(chan struct{})
select { select {
case pa.chReaderRemove <- req: case pa.chRemoveReader <- req:
<-req.res <-req.res
case <-pa.ctx.Done(): case <-pa.ctx.Done():
} }

View File

@@ -84,15 +84,15 @@ type pathManager struct {
pathsByConf map[string]map[*path]struct{} pathsByConf map[string]map[*path]struct{}
// in // in
chConfReload chan map[string]*conf.PathConf chReloadConf chan map[string]*conf.PathConf
chPathClose chan *path chClosePath chan *path
chPathReady chan *path chPathReady chan *path
chPathNotReady chan *path chPathNotReady chan *path
chGetConfForPath chan pathGetConfForPathReq chGetConfForPath chan pathGetConfForPathReq
chDescribe chan pathDescribeReq chDescribe chan pathDescribeReq
chReaderAdd chan pathReaderAddReq chAddReader chan pathAddReaderReq
chPublisherAdd chan pathPublisherAddReq chAddPublisher chan pathAddPublisherReq
chHLSManagerSet chan pathManagerHLSManager chSetHLSManager chan pathManagerHLSManager
chAPIPathsList chan pathAPIPathsListReq chAPIPathsList chan pathAPIPathsListReq
chAPIPathsGet chan pathAPIPathsGetReq chAPIPathsGet chan pathAPIPathsGetReq
} }
@@ -128,15 +128,15 @@ func newPathManager(
ctxCancel: ctxCancel, ctxCancel: ctxCancel,
paths: make(map[string]*path), paths: make(map[string]*path),
pathsByConf: make(map[string]map[*path]struct{}), pathsByConf: make(map[string]map[*path]struct{}),
chConfReload: make(chan map[string]*conf.PathConf), chReloadConf: make(chan map[string]*conf.PathConf),
chPathClose: make(chan *path), chClosePath: make(chan *path),
chPathReady: make(chan *path), chPathReady: make(chan *path),
chPathNotReady: make(chan *path), chPathNotReady: make(chan *path),
chGetConfForPath: make(chan pathGetConfForPathReq), chGetConfForPath: make(chan pathGetConfForPathReq),
chDescribe: make(chan pathDescribeReq), chDescribe: make(chan pathDescribeReq),
chReaderAdd: make(chan pathReaderAddReq), chAddReader: make(chan pathAddReaderReq),
chPublisherAdd: make(chan pathPublisherAddReq), chAddPublisher: make(chan pathAddPublisherReq),
chHLSManagerSet: make(chan pathManagerHLSManager), chSetHLSManager: make(chan pathManagerHLSManager),
chAPIPathsList: make(chan pathAPIPathsListReq), chAPIPathsList: make(chan pathAPIPathsListReq),
chAPIPathsGet: make(chan pathAPIPathsGetReq), chAPIPathsGet: make(chan pathAPIPathsGetReq),
} }
@@ -176,7 +176,7 @@ func (pm *pathManager) run() {
outer: outer:
for { for {
select { select {
case newPathConfs := <-pm.chConfReload: case newPathConfs := <-pm.chReloadConf:
for confName, pathConf := range pm.pathConfs { for confName, pathConf := range pm.pathConfs {
if newPathConf, ok := newPathConfs[confName]; ok { if newPathConf, ok := newPathConfs[confName]; ok {
// configuration has changed // configuration has changed
@@ -212,7 +212,7 @@ outer:
} }
} }
case pa := <-pm.chPathClose: case pa := <-pm.chClosePath:
if pmpa, ok := pm.paths[pa.name]; !ok || pmpa != pa { if pmpa, ok := pm.paths[pa.name]; !ok || pmpa != pa {
continue continue
} }
@@ -264,17 +264,17 @@ outer:
req.res <- pathDescribeRes{path: pm.paths[req.pathName]} req.res <- pathDescribeRes{path: pm.paths[req.pathName]}
case req := <-pm.chReaderAdd: case req := <-pm.chAddReader:
pathConfName, pathConf, pathMatches, err := getConfForPath(pm.pathConfs, req.pathName) pathConfName, pathConf, pathMatches, err := getConfForPath(pm.pathConfs, req.pathName)
if err != nil { if err != nil {
req.res <- pathReaderSetupPlayRes{err: err} req.res <- pathAddReaderRes{err: err}
continue continue
} }
if !req.skipAuth { if !req.skipAuth {
err = doAuthentication(pm.externalAuthenticationURL, pm.authMethods, req.pathName, pathConf, false, req.credentials) err = doAuthentication(pm.externalAuthenticationURL, pm.authMethods, req.pathName, pathConf, false, req.credentials)
if err != nil { if err != nil {
req.res <- pathReaderSetupPlayRes{err: err} req.res <- pathAddReaderRes{err: err}
continue continue
} }
} }
@@ -284,19 +284,19 @@ outer:
pm.createPath(pathConfName, pathConf, req.pathName, pathMatches) pm.createPath(pathConfName, pathConf, req.pathName, pathMatches)
} }
req.res <- pathReaderSetupPlayRes{path: pm.paths[req.pathName]} req.res <- pathAddReaderRes{path: pm.paths[req.pathName]}
case req := <-pm.chPublisherAdd: case req := <-pm.chAddPublisher:
pathConfName, pathConf, pathMatches, err := getConfForPath(pm.pathConfs, req.pathName) pathConfName, pathConf, pathMatches, err := getConfForPath(pm.pathConfs, req.pathName)
if err != nil { if err != nil {
req.res <- pathPublisherAddRes{err: err} req.res <- pathAddPublisherRes{err: err}
continue continue
} }
if !req.skipAuth { if !req.skipAuth {
err = doAuthentication(pm.externalAuthenticationURL, pm.authMethods, req.pathName, pathConf, true, req.credentials) err = doAuthentication(pm.externalAuthenticationURL, pm.authMethods, req.pathName, pathConf, true, req.credentials)
if err != nil { if err != nil {
req.res <- pathPublisherAddRes{err: err} req.res <- pathAddPublisherRes{err: err}
continue continue
} }
} }
@@ -306,9 +306,9 @@ outer:
pm.createPath(pathConfName, pathConf, req.pathName, pathMatches) pm.createPath(pathConfName, pathConf, req.pathName, pathMatches)
} }
req.res <- pathPublisherAddRes{path: pm.paths[req.pathName]} req.res <- pathAddPublisherRes{path: pm.paths[req.pathName]}
case s := <-pm.chHLSManagerSet: case s := <-pm.chSetHLSManager:
pm.hlsManager = s pm.hlsManager = s
case req := <-pm.chAPIPathsList: case req := <-pm.chAPIPathsList:
@@ -381,7 +381,7 @@ func (pm *pathManager) removePath(pa *path) {
// confReload is called by core. // confReload is called by core.
func (pm *pathManager) confReload(pathConfs map[string]*conf.PathConf) { func (pm *pathManager) confReload(pathConfs map[string]*conf.PathConf) {
select { select {
case pm.chConfReload <- pathConfs: case pm.chReloadConf <- pathConfs:
case <-pm.ctx.Done(): case <-pm.ctx.Done():
} }
} }
@@ -404,10 +404,10 @@ func (pm *pathManager) pathNotReady(pa *path) {
} }
} }
// onPathClose is called by path. // closePath is called by path.
func (pm *pathManager) onPathClose(pa *path) { func (pm *pathManager) closePath(pa *path) {
select { select {
case pm.chPathClose <- pa: case pm.chClosePath <- pa:
case <-pm.ctx.Done(): case <-pm.ctx.Done():
case <-pa.ctx.Done(): // in case pathManager is blocked by path.wait() case <-pa.ctx.Done(): // in case pathManager is blocked by path.wait()
} }
@@ -448,44 +448,44 @@ func (pm *pathManager) describe(req pathDescribeReq) pathDescribeRes {
} }
} }
// publisherAnnounce is called by a publisher. // addPublisher is called by a publisher.
func (pm *pathManager) publisherAdd(req pathPublisherAddReq) pathPublisherAddRes { func (pm *pathManager) addPublisher(req pathAddPublisherReq) pathAddPublisherRes {
req.res = make(chan pathPublisherAddRes) req.res = make(chan pathAddPublisherRes)
select { select {
case pm.chPublisherAdd <- req: case pm.chAddPublisher <- req:
res := <-req.res res := <-req.res
if res.err != nil { if res.err != nil {
return res return res
} }
return res.path.publisherAdd(req) return res.path.addPublisher(req)
case <-pm.ctx.Done(): case <-pm.ctx.Done():
return pathPublisherAddRes{err: fmt.Errorf("terminated")} return pathAddPublisherRes{err: fmt.Errorf("terminated")}
} }
} }
// readerSetupPlay is called by a reader. // addReader is called by a reader.
func (pm *pathManager) readerAdd(req pathReaderAddReq) pathReaderSetupPlayRes { func (pm *pathManager) addReader(req pathAddReaderReq) pathAddReaderRes {
req.res = make(chan pathReaderSetupPlayRes) req.res = make(chan pathAddReaderRes)
select { select {
case pm.chReaderAdd <- req: case pm.chAddReader <- req:
res := <-req.res res := <-req.res
if res.err != nil { if res.err != nil {
return res return res
} }
return res.path.readerAdd(req) return res.path.addReader(req)
case <-pm.ctx.Done(): case <-pm.ctx.Done():
return pathReaderSetupPlayRes{err: fmt.Errorf("terminated")} return pathAddReaderRes{err: fmt.Errorf("terminated")}
} }
} }
// hlsManagerSet is called by hlsManager. // setHLSManager is called by hlsManager.
func (pm *pathManager) hlsManagerSet(s pathManagerHLSManager) { func (pm *pathManager) setHLSManager(s pathManagerHLSManager) {
select { select {
case pm.chHLSManagerSet <- s: case pm.chSetHLSManager <- s:
case <-pm.ctx.Done(): case <-pm.ctx.Done():
} }
} }

View File

@@ -47,8 +47,8 @@ const (
) )
type rtmpConnPathManager interface { type rtmpConnPathManager interface {
readerAdd(req pathReaderAddReq) pathReaderSetupPlayRes addReader(req pathAddReaderReq) pathAddReaderRes
publisherAdd(req pathPublisherAddReq) pathPublisherAddRes addPublisher(req pathAddPublisherReq) pathAddPublisherRes
} }
type rtmpConnParent interface { type rtmpConnParent interface {
@@ -208,7 +208,7 @@ func (c *rtmpConn) runReader() error {
func (c *rtmpConn) runRead(u *url.URL) error { func (c *rtmpConn) runRead(u *url.URL) error {
pathName, query, rawQuery := pathNameAndQuery(u) pathName, query, rawQuery := pathNameAndQuery(u)
res := c.pathManager.readerAdd(pathReaderAddReq{ res := c.pathManager.addReader(pathAddReaderReq{
author: c, author: c,
pathName: pathName, pathName: pathName,
credentials: authCredentials{ credentials: authCredentials{
@@ -230,7 +230,7 @@ func (c *rtmpConn) runRead(u *url.URL) error {
return res.err return res.err
} }
defer res.path.readerRemove(pathReaderRemoveReq{author: c}) defer res.path.removeReader(pathRemoveReaderReq{author: c})
c.mutex.Lock() c.mutex.Lock()
c.state = rtmpConnStateRead c.state = rtmpConnStateRead
@@ -572,7 +572,7 @@ func (c *rtmpConn) setupAudio(
func (c *rtmpConn) runPublish(u *url.URL) error { func (c *rtmpConn) runPublish(u *url.URL) error {
pathName, query, rawQuery := pathNameAndQuery(u) pathName, query, rawQuery := pathNameAndQuery(u)
res := c.pathManager.publisherAdd(pathPublisherAddReq{ res := c.pathManager.addPublisher(pathAddPublisherReq{
author: c, author: c,
pathName: pathName, pathName: pathName,
credentials: authCredentials{ credentials: authCredentials{
@@ -594,7 +594,7 @@ func (c *rtmpConn) runPublish(u *url.URL) error {
return res.err return res.err
} }
defer res.path.publisherRemove(pathPublisherRemoveReq{author: c}) defer res.path.removePublisher(pathRemovePublisherReq{author: c})
c.mutex.Lock() c.mutex.Lock()
c.state = rtmpConnStatePublish c.state = rtmpConnStatePublish
@@ -685,7 +685,7 @@ func (c *rtmpConn) runPublish(u *url.URL) error {
} }
} }
rres := res.path.publisherStart(pathPublisherStartReq{ rres := res.path.startPublisher(pathStartPublisherReq{
author: c, author: c,
medias: medias, medias: medias,
generateRTPPackets: true, generateRTPPackets: true,

View File

@@ -21,8 +21,8 @@ import (
) )
type rtspSessionPathManager interface { type rtspSessionPathManager interface {
publisherAdd(req pathPublisherAddReq) pathPublisherAddRes addPublisher(req pathAddPublisherReq) pathAddPublisherRes
readerAdd(req pathReaderAddReq) pathReaderSetupPlayRes addReader(req pathAddReaderReq) pathAddReaderRes
} }
type rtspSessionParent interface { type rtspSessionParent interface {
@@ -100,10 +100,10 @@ func (s *rtspSession) onClose(err error) {
switch s.session.State() { switch s.session.State() {
case gortsplib.ServerSessionStatePrePlay, gortsplib.ServerSessionStatePlay: case gortsplib.ServerSessionStatePrePlay, gortsplib.ServerSessionStatePlay:
s.path.readerRemove(pathReaderRemoveReq{author: s}) s.path.removeReader(pathRemoveReaderReq{author: s})
case gortsplib.ServerSessionStatePreRecord, gortsplib.ServerSessionStateRecord: case gortsplib.ServerSessionStatePreRecord, gortsplib.ServerSessionStateRecord:
s.path.publisherRemove(pathPublisherRemoveReq{author: s}) s.path.removePublisher(pathRemovePublisherReq{author: s})
} }
s.path = nil s.path = nil
@@ -131,7 +131,7 @@ func (s *rtspSession) onAnnounce(c *rtspConn, ctx *gortsplib.ServerHandlerOnAnno
} }
} }
res := s.pathManager.publisherAdd(pathPublisherAddReq{ res := s.pathManager.addPublisher(pathAddPublisherReq{
author: s, author: s,
pathName: ctx.Path, pathName: ctx.Path,
credentials: authCredentials{ credentials: authCredentials{
@@ -216,7 +216,7 @@ func (s *rtspSession) onSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCt
} }
} }
res := s.pathManager.readerAdd(pathReaderAddReq{ res := s.pathManager.addReader(pathAddReaderReq{
author: s, author: s,
pathName: ctx.Path, pathName: ctx.Path,
credentials: authCredentials{ credentials: authCredentials{
@@ -304,7 +304,7 @@ func (s *rtspSession) onPlay(_ *gortsplib.ServerHandlerOnPlayCtx) (*base.Respons
// onRecord is called by rtspServer. // onRecord is called by rtspServer.
func (s *rtspSession) onRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.Response, error) { func (s *rtspSession) onRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.Response, error) {
res := s.path.publisherStart(pathPublisherStartReq{ res := s.path.startPublisher(pathStartPublisherReq{
author: s, author: s,
medias: s.session.AnnouncedMedias(), medias: s.session.AnnouncedMedias(),
generateRTPPackets: false, generateRTPPackets: false,
@@ -356,7 +356,7 @@ func (s *rtspSession) onPause(_ *gortsplib.ServerHandlerOnPauseCtx) (*base.Respo
s.mutex.Unlock() s.mutex.Unlock()
case gortsplib.ServerSessionStateRecord: case gortsplib.ServerSessionStateRecord:
s.path.publisherStop(pathPublisherStopReq{author: s}) s.path.stopPublisher(pathStopPublisherReq{author: s})
s.mutex.Lock() s.mutex.Lock()
s.state = gortsplib.ServerSessionStatePreRecord s.state = gortsplib.ServerSessionStatePreRecord

View File

@@ -162,8 +162,8 @@ func marshalICEFragment(offer *webrtc.SessionDescription, candidates []*webrtc.I
type webRTCHTTPServerParent interface { type webRTCHTTPServerParent interface {
logger.Writer logger.Writer
generateICEServers() ([]webrtc.ICEServer, error) generateICEServers() ([]webrtc.ICEServer, error)
sessionNew(req webRTCSessionNewReq) webRTCSessionNewRes newSession(req webRTCNewSessionReq) webRTCNewSessionRes
sessionAddCandidates(req webRTCSessionAddCandidatesReq) webRTCSessionAddCandidatesRes addSessionCandidates(req webRTCAddSessionCandidatesReq) webRTCAddSessionCandidatesRes
} }
type webRTCHTTPServer struct { type webRTCHTTPServer struct {
@@ -372,7 +372,7 @@ func (s *webRTCHTTPServer) onRequest(ctx *gin.Context) {
return return
} }
res := s.parent.sessionNew(webRTCSessionNewReq{ res := s.parent.newSession(webRTCNewSessionReq{
pathName: dir, pathName: dir,
remoteAddr: remoteAddr, remoteAddr: remoteAddr,
query: ctx.Request.URL.RawQuery, query: ctx.Request.URL.RawQuery,
@@ -425,7 +425,7 @@ func (s *webRTCHTTPServer) onRequest(ctx *gin.Context) {
return return
} }
res := s.parent.sessionAddCandidates(webRTCSessionAddCandidatesReq{ res := s.parent.addSessionCandidates(webRTCAddSessionCandidatesReq{
secret: secret, secret: secret,
candidates: candidates, candidates: candidates,
}) })

View File

@@ -7,9 +7,10 @@ import (
"github.com/bluenviron/gortsplib/v3/pkg/formats" "github.com/bluenviron/gortsplib/v3/pkg/formats"
"github.com/bluenviron/gortsplib/v3/pkg/media" "github.com/bluenviron/gortsplib/v3/pkg/media"
"github.com/bluenviron/mediamtx/internal/stream"
"github.com/pion/rtcp" "github.com/pion/rtcp"
"github.com/pion/webrtc/v3" "github.com/pion/webrtc/v3"
"github.com/bluenviron/mediamtx/internal/stream"
) )
const ( const (

View File

@@ -112,14 +112,14 @@ type webRTCManagerAPISessionsGetReq struct {
res chan webRTCManagerAPISessionsGetRes res chan webRTCManagerAPISessionsGetRes
} }
type webRTCSessionNewRes struct { type webRTCNewSessionRes struct {
sx *webRTCSession sx *webRTCSession
answer []byte answer []byte
err error err error
errStatusCode int errStatusCode int
} }
type webRTCSessionNewReq struct { type webRTCNewSessionReq struct {
pathName string pathName string
remoteAddr string remoteAddr string
query string query string
@@ -127,18 +127,18 @@ type webRTCSessionNewReq struct {
pass string pass string
offer []byte offer []byte
publish bool publish bool
res chan webRTCSessionNewRes res chan webRTCNewSessionRes
} }
type webRTCSessionAddCandidatesRes struct { type webRTCAddSessionCandidatesRes struct {
sx *webRTCSession sx *webRTCSession
err error err error
} }
type webRTCSessionAddCandidatesReq struct { type webRTCAddSessionCandidatesReq struct {
secret uuid.UUID secret uuid.UUID
candidates []*webrtc.ICECandidateInit candidates []*webrtc.ICECandidateInit
res chan webRTCSessionAddCandidatesRes res chan webRTCAddSessionCandidatesRes
} }
type webRTCManagerParent interface { type webRTCManagerParent interface {
@@ -166,9 +166,9 @@ type webRTCManager struct {
iceTCPMux ice.TCPMux iceTCPMux ice.TCPMux
// in // in
chSessionNew chan webRTCSessionNewReq chNewSession chan webRTCNewSessionReq
chSessionClose chan *webRTCSession chCloseSession chan *webRTCSession
chSessionAddCandidates chan webRTCSessionAddCandidatesReq chAddSessionCandidates chan webRTCAddSessionCandidatesReq
chAPISessionsList chan webRTCManagerAPISessionsListReq chAPISessionsList chan webRTCManagerAPISessionsListReq
chAPISessionsGet chan webRTCManagerAPISessionsGetReq chAPISessionsGet chan webRTCManagerAPISessionsGetReq
chAPIConnsKick chan webRTCManagerAPISessionsKickReq chAPIConnsKick chan webRTCManagerAPISessionsKickReq
@@ -209,9 +209,9 @@ func newWebRTCManager(
iceHostNAT1To1IPs: iceHostNAT1To1IPs, iceHostNAT1To1IPs: iceHostNAT1To1IPs,
sessions: make(map[*webRTCSession]struct{}), sessions: make(map[*webRTCSession]struct{}),
sessionsBySecret: make(map[uuid.UUID]*webRTCSession), sessionsBySecret: make(map[uuid.UUID]*webRTCSession),
chSessionNew: make(chan webRTCSessionNewReq), chNewSession: make(chan webRTCNewSessionReq),
chSessionClose: make(chan *webRTCSession), chCloseSession: make(chan *webRTCSession),
chSessionAddCandidates: make(chan webRTCSessionAddCandidatesReq), chAddSessionCandidates: make(chan webRTCAddSessionCandidatesReq),
chAPISessionsList: make(chan webRTCManagerAPISessionsListReq), chAPISessionsList: make(chan webRTCManagerAPISessionsListReq),
chAPISessionsGet: make(chan webRTCManagerAPISessionsGetReq), chAPISessionsGet: make(chan webRTCManagerAPISessionsGetReq),
chAPIConnsKick: make(chan webRTCManagerAPISessionsKickReq), chAPIConnsKick: make(chan webRTCManagerAPISessionsKickReq),
@@ -293,7 +293,7 @@ func (m *webRTCManager) run() {
outer: outer:
for { for {
select { select {
case req := <-m.chSessionNew: case req := <-m.chNewSession:
sx := newWebRTCSession( sx := newWebRTCSession(
m.ctx, m.ctx,
m.readBufferCount, m.readBufferCount,
@@ -307,20 +307,20 @@ outer:
) )
m.sessions[sx] = struct{}{} m.sessions[sx] = struct{}{}
m.sessionsBySecret[sx.secret] = sx m.sessionsBySecret[sx.secret] = sx
req.res <- webRTCSessionNewRes{sx: sx} req.res <- webRTCNewSessionRes{sx: sx}
case sx := <-m.chSessionClose: case sx := <-m.chCloseSession:
delete(m.sessions, sx) delete(m.sessions, sx)
delete(m.sessionsBySecret, sx.secret) delete(m.sessionsBySecret, sx.secret)
case req := <-m.chSessionAddCandidates: case req := <-m.chAddSessionCandidates:
sx, ok := m.sessionsBySecret[req.secret] sx, ok := m.sessionsBySecret[req.secret]
if !ok { if !ok {
req.res <- webRTCSessionAddCandidatesRes{err: fmt.Errorf("session not found")} req.res <- webRTCAddSessionCandidatesRes{err: fmt.Errorf("session not found")}
continue continue
} }
req.res <- webRTCSessionAddCandidatesRes{sx: sx} req.res <- webRTCAddSessionCandidatesRes{sx: sx}
case req := <-m.chAPISessionsList: case req := <-m.chAPISessionsList:
data := &apiWebRTCSessionsList{ data := &apiWebRTCSessionsList{
@@ -417,36 +417,36 @@ func (m *webRTCManager) generateICEServers() ([]webrtc.ICEServer, error) {
return ret, nil return ret, nil
} }
// sessionNew is called by webRTCHTTPServer. // newSession is called by webRTCHTTPServer.
func (m *webRTCManager) sessionNew(req webRTCSessionNewReq) webRTCSessionNewRes { func (m *webRTCManager) newSession(req webRTCNewSessionReq) webRTCNewSessionRes {
req.res = make(chan webRTCSessionNewRes) req.res = make(chan webRTCNewSessionRes)
select { select {
case m.chSessionNew <- req: case m.chNewSession <- req:
res := <-req.res res := <-req.res
return res.sx.new(req) return res.sx.new(req)
case <-m.ctx.Done(): case <-m.ctx.Done():
return webRTCSessionNewRes{err: fmt.Errorf("terminated"), errStatusCode: http.StatusInternalServerError} return webRTCNewSessionRes{err: fmt.Errorf("terminated"), errStatusCode: http.StatusInternalServerError}
} }
} }
// sessionClose is called by webRTCSession. // closeSession is called by webRTCSession.
func (m *webRTCManager) sessionClose(sx *webRTCSession) { func (m *webRTCManager) closeSession(sx *webRTCSession) {
select { select {
case m.chSessionClose <- sx: case m.chCloseSession <- sx:
case <-m.ctx.Done(): case <-m.ctx.Done():
} }
} }
// sessionAddCandidates is called by webRTCHTTPServer. // addSessionCandidates is called by webRTCHTTPServer.
func (m *webRTCManager) sessionAddCandidates( func (m *webRTCManager) addSessionCandidates(
req webRTCSessionAddCandidatesReq, req webRTCAddSessionCandidatesReq,
) webRTCSessionAddCandidatesRes { ) webRTCAddSessionCandidatesRes {
req.res = make(chan webRTCSessionAddCandidatesRes) req.res = make(chan webRTCAddSessionCandidatesRes)
select { select {
case m.chSessionAddCandidates <- req: case m.chAddSessionCandidates <- req:
res1 := <-req.res res1 := <-req.res
if res1.err != nil { if res1.err != nil {
return res1 return res1
@@ -455,7 +455,7 @@ func (m *webRTCManager) sessionAddCandidates(
return res1.sx.addCandidates(req) return res1.sx.addCandidates(req)
case <-m.ctx.Done(): case <-m.ctx.Done():
return webRTCSessionAddCandidatesRes{err: fmt.Errorf("terminated")} return webRTCAddSessionCandidatesRes{err: fmt.Errorf("terminated")}
} }
} }

View File

@@ -131,13 +131,13 @@ func gatherIncomingTracks(
} }
type webRTCSessionPathManager interface { type webRTCSessionPathManager interface {
publisherAdd(req pathPublisherAddReq) pathPublisherAddRes addPublisher(req pathAddPublisherReq) pathAddPublisherRes
readerAdd(req pathReaderAddReq) pathReaderSetupPlayRes addReader(req pathAddReaderReq) pathAddReaderRes
} }
type webRTCSession struct { type webRTCSession struct {
readBufferCount int readBufferCount int
req webRTCSessionNewReq req webRTCNewSessionReq
wg *sync.WaitGroup wg *sync.WaitGroup
iceHostNAT1To1IPs []string iceHostNAT1To1IPs []string
iceUDPMux ice.UDPMux iceUDPMux ice.UDPMux
@@ -145,23 +145,22 @@ type webRTCSession struct {
pathManager webRTCSessionPathManager pathManager webRTCSessionPathManager
parent *webRTCManager parent *webRTCManager
ctx context.Context ctx context.Context
ctxCancel func() ctxCancel func()
created time.Time created time.Time
uuid uuid.UUID uuid uuid.UUID
secret uuid.UUID secret uuid.UUID
answerSent bool mutex sync.RWMutex
mutex sync.RWMutex pc *peerConnection
pc *peerConnection
chNew chan webRTCSessionNewReq chNew chan webRTCNewSessionReq
chAddCandidates chan webRTCSessionAddCandidatesReq chAddCandidates chan webRTCAddSessionCandidatesReq
} }
func newWebRTCSession( func newWebRTCSession(
parentCtx context.Context, parentCtx context.Context,
readBufferCount int, readBufferCount int,
req webRTCSessionNewReq, req webRTCNewSessionReq,
wg *sync.WaitGroup, wg *sync.WaitGroup,
iceHostNAT1To1IPs []string, iceHostNAT1To1IPs []string,
iceUDPMux ice.UDPMux, iceUDPMux ice.UDPMux,
@@ -185,8 +184,8 @@ func newWebRTCSession(
created: time.Now(), created: time.Now(),
uuid: uuid.New(), uuid: uuid.New(),
secret: uuid.New(), secret: uuid.New(),
chNew: make(chan webRTCSessionNewReq), chNew: make(chan webRTCNewSessionReq),
chAddCandidates: make(chan webRTCSessionAddCandidatesReq), chAddCandidates: make(chan webRTCAddSessionCandidatesReq),
} }
s.Log(logger.Info, "created by %s", req.remoteAddr) s.Log(logger.Info, "created by %s", req.remoteAddr)
@@ -213,7 +212,7 @@ func (s *webRTCSession) run() {
s.ctxCancel() s.ctxCancel()
s.parent.sessionClose(s) s.parent.closeSession(s)
s.Log(logger.Info, "closed (%v)", err) s.Log(logger.Info, "closed (%v)", err)
} }
@@ -221,16 +220,14 @@ func (s *webRTCSession) run() {
func (s *webRTCSession) runInner() error { func (s *webRTCSession) runInner() error {
select { select {
case <-s.chNew: case <-s.chNew:
// do not store the request, we already have it
case <-s.ctx.Done(): case <-s.ctx.Done():
return fmt.Errorf("terminated") return fmt.Errorf("terminated")
} }
errStatusCode, err := s.runInner2() errStatusCode, err := s.runInner2()
if !s.answerSent { if errStatusCode != 0 {
s.req.res <- webRTCSessionNewRes{ s.req.res <- webRTCNewSessionRes{
err: err, err: err,
errStatusCode: errStatusCode, errStatusCode: errStatusCode,
} }
@@ -249,7 +246,7 @@ func (s *webRTCSession) runInner2() (int, error) {
func (s *webRTCSession) runPublish() (int, error) { func (s *webRTCSession) runPublish() (int, error) {
ip, _, _ := net.SplitHostPort(s.req.remoteAddr) ip, _, _ := net.SplitHostPort(s.req.remoteAddr)
res := s.pathManager.publisherAdd(pathPublisherAddReq{ res := s.pathManager.addPublisher(pathAddPublisherReq{
author: s, author: s,
pathName: s.req.pathName, pathName: s.req.pathName,
credentials: authCredentials{ credentials: authCredentials{
@@ -272,7 +269,7 @@ func (s *webRTCSession) runPublish() (int, error) {
return http.StatusBadRequest, res.err return http.StatusBadRequest, res.err
} }
defer res.path.publisherRemove(pathPublisherRemoveReq{author: s}) defer res.path.removePublisher(pathRemovePublisherReq{author: s})
servers, err := s.parent.generateICEServers() servers, err := s.parent.generateICEServers()
if err != nil { if err != nil {
@@ -388,7 +385,7 @@ func (s *webRTCSession) runPublish() (int, error) {
} }
medias := mediasOfIncomingTracks(tracks) medias := mediasOfIncomingTracks(tracks)
rres := res.path.publisherStart(pathPublisherStartReq{ rres := res.path.startPublisher(pathStartPublisherReq{
author: s, author: s,
medias: medias, medias: medias,
generateRTPPackets: false, generateRTPPackets: false,
@@ -417,7 +414,7 @@ func (s *webRTCSession) runPublish() (int, error) {
func (s *webRTCSession) runRead() (int, error) { func (s *webRTCSession) runRead() (int, error) {
ip, _, _ := net.SplitHostPort(s.req.remoteAddr) ip, _, _ := net.SplitHostPort(s.req.remoteAddr)
res := s.pathManager.readerAdd(pathReaderAddReq{ res := s.pathManager.addReader(pathAddReaderReq{
author: s, author: s,
pathName: s.req.pathName, pathName: s.req.pathName,
credentials: authCredentials{ credentials: authCredentials{
@@ -444,7 +441,7 @@ func (s *webRTCSession) runRead() (int, error) {
return http.StatusBadRequest, res.err return http.StatusBadRequest, res.err
} }
defer res.path.readerRemove(pathReaderRemoveReq{author: s}) defer res.path.removeReader(pathRemoveReaderReq{author: s})
tracks, err := gatherOutgoingTracks(res.stream.Medias()) tracks, err := gatherOutgoingTracks(res.stream.Medias())
if err != nil { if err != nil {
@@ -569,11 +566,10 @@ func (s *webRTCSession) waitGatheringDone(pc *peerConnection) error {
} }
func (s *webRTCSession) writeAnswer(answer *webrtc.SessionDescription) { func (s *webRTCSession) writeAnswer(answer *webrtc.SessionDescription) {
s.req.res <- webRTCSessionNewRes{ s.req.res <- webRTCNewSessionRes{
sx: s, sx: s,
answer: []byte(answer.SDP), answer: []byte(answer.SDP),
} }
s.answerSent = true
} }
func (s *webRTCSession) readRemoteCandidates(pc *peerConnection) { func (s *webRTCSession) readRemoteCandidates(pc *peerConnection) {
@@ -583,10 +579,10 @@ func (s *webRTCSession) readRemoteCandidates(pc *peerConnection) {
for _, candidate := range req.candidates { for _, candidate := range req.candidates {
err := pc.AddICECandidate(*candidate) err := pc.AddICECandidate(*candidate)
if err != nil { if err != nil {
req.res <- webRTCSessionAddCandidatesRes{err: err} req.res <- webRTCAddSessionCandidatesRes{err: err}
} }
} }
req.res <- webRTCSessionAddCandidatesRes{} req.res <- webRTCAddSessionCandidatesRes{}
case <-s.ctx.Done(): case <-s.ctx.Done():
return return
@@ -595,26 +591,26 @@ func (s *webRTCSession) readRemoteCandidates(pc *peerConnection) {
} }
// new is called by webRTCHTTPServer through webRTCManager. // new is called by webRTCHTTPServer through webRTCManager.
func (s *webRTCSession) new(req webRTCSessionNewReq) webRTCSessionNewRes { func (s *webRTCSession) new(req webRTCNewSessionReq) webRTCNewSessionRes {
select { select {
case s.chNew <- req: case s.chNew <- req:
return <-req.res return <-req.res
case <-s.ctx.Done(): case <-s.ctx.Done():
return webRTCSessionNewRes{err: fmt.Errorf("terminated"), errStatusCode: http.StatusInternalServerError} return webRTCNewSessionRes{err: fmt.Errorf("terminated"), errStatusCode: http.StatusInternalServerError}
} }
} }
// addCandidates is called by webRTCHTTPServer through webRTCManager. // addCandidates is called by webRTCHTTPServer through webRTCManager.
func (s *webRTCSession) addCandidates( func (s *webRTCSession) addCandidates(
req webRTCSessionAddCandidatesReq, req webRTCAddSessionCandidatesReq,
) webRTCSessionAddCandidatesRes { ) webRTCAddSessionCandidatesRes {
select { select {
case s.chAddCandidates <- req: case s.chAddCandidates <- req:
return <-req.res return <-req.res
case <-s.ctx.Done(): case <-s.ctx.Done():
return webRTCSessionAddCandidatesRes{err: fmt.Errorf("terminated")} return webRTCAddSessionCandidatesRes{err: fmt.Errorf("terminated")}
} }
} }