remove "on" prefix from most communication functions between components

This commit is contained in:
aler9
2022-08-04 21:07:17 +02:00
parent ea528f47e1
commit 055e08ac6c
20 changed files with 384 additions and 384 deletions

View File

@@ -145,26 +145,26 @@ func loadConfPathData(ctx *gin.Context) (interface{}, error) {
} }
type apiPathManager interface { type apiPathManager interface {
onAPIPathsList(req pathAPIPathsListReq) pathAPIPathsListRes apiPathsList(req pathAPIPathsListReq) pathAPIPathsListRes
} }
type apiRTSPServer interface { type apiRTSPServer interface {
onAPISessionsList(req rtspServerAPISessionsListReq) rtspServerAPISessionsListRes apiSessionsList(req rtspServerAPISessionsListReq) rtspServerAPISessionsListRes
onAPISessionsKick(req rtspServerAPISessionsKickReq) rtspServerAPISessionsKickRes apiSessionsKick(req rtspServerAPISessionsKickReq) rtspServerAPISessionsKickRes
} }
type apiRTMPServer interface { type apiRTMPServer interface {
onAPIConnsList(req rtmpServerAPIConnsListReq) rtmpServerAPIConnsListRes apiConnsList(req rtmpServerAPIConnsListReq) rtmpServerAPIConnsListRes
onAPIConnsKick(req rtmpServerAPIConnsKickReq) rtmpServerAPIConnsKickRes apiConnsKick(req rtmpServerAPIConnsKickReq) rtmpServerAPIConnsKickRes
} }
type apiHLSServer interface { type apiHLSServer interface {
onAPIHLSMuxersList(req hlsServerAPIMuxersListReq) hlsServerAPIMuxersListRes apiHLSMuxersList(req hlsServerAPIMuxersListReq) hlsServerAPIMuxersListRes
} }
type apiParent interface { type apiParent interface {
Log(logger.Level, string, ...interface{}) Log(logger.Level, string, ...interface{})
onAPIConfigSet(conf *conf.Conf) apiConfigSet(conf *conf.Conf)
} }
type api struct { type api struct {
@@ -303,7 +303,7 @@ func (a *api) onConfigSet(ctx *gin.Context) {
// since reloading the configuration can cause the shutdown of the API, // since reloading the configuration can cause the shutdown of the API,
// call it in a goroutine // call it in a goroutine
go a.parent.onAPIConfigSet(&newConf) go a.parent.apiConfigSet(&newConf)
ctx.Status(http.StatusOK) ctx.Status(http.StatusOK)
} }
@@ -348,7 +348,7 @@ func (a *api) onConfigPathsAdd(ctx *gin.Context) {
// since reloading the configuration can cause the shutdown of the API, // since reloading the configuration can cause the shutdown of the API,
// call it in a goroutine // call it in a goroutine
go a.parent.onAPIConfigSet(&newConf) go a.parent.apiConfigSet(&newConf)
ctx.Status(http.StatusOK) ctx.Status(http.StatusOK)
} }
@@ -391,7 +391,7 @@ func (a *api) onConfigPathsEdit(ctx *gin.Context) {
// since reloading the configuration can cause the shutdown of the API, // since reloading the configuration can cause the shutdown of the API,
// call it in a goroutine // call it in a goroutine
go a.parent.onAPIConfigSet(&newConf) go a.parent.apiConfigSet(&newConf)
ctx.Status(http.StatusOK) ctx.Status(http.StatusOK)
} }
@@ -427,13 +427,13 @@ func (a *api) onConfigPathsDelete(ctx *gin.Context) {
// since reloading the configuration can cause the shutdown of the API, // since reloading the configuration can cause the shutdown of the API,
// call it in a goroutine // call it in a goroutine
go a.parent.onAPIConfigSet(&newConf) go a.parent.apiConfigSet(&newConf)
ctx.Status(http.StatusOK) ctx.Status(http.StatusOK)
} }
func (a *api) onPathsList(ctx *gin.Context) { func (a *api) onPathsList(ctx *gin.Context) {
res := a.pathManager.onAPIPathsList(pathAPIPathsListReq{}) res := a.pathManager.apiPathsList(pathAPIPathsListReq{})
if res.err != nil { if res.err != nil {
ctx.AbortWithStatus(http.StatusInternalServerError) ctx.AbortWithStatus(http.StatusInternalServerError)
return return
@@ -443,7 +443,7 @@ func (a *api) onPathsList(ctx *gin.Context) {
} }
func (a *api) onRTSPSessionsList(ctx *gin.Context) { func (a *api) onRTSPSessionsList(ctx *gin.Context) {
res := a.rtspServer.onAPISessionsList(rtspServerAPISessionsListReq{}) res := a.rtspServer.apiSessionsList(rtspServerAPISessionsListReq{})
if res.err != nil { if res.err != nil {
ctx.AbortWithStatus(http.StatusInternalServerError) ctx.AbortWithStatus(http.StatusInternalServerError)
return return
@@ -455,7 +455,7 @@ func (a *api) onRTSPSessionsList(ctx *gin.Context) {
func (a *api) onRTSPSessionsKick(ctx *gin.Context) { func (a *api) onRTSPSessionsKick(ctx *gin.Context) {
id := ctx.Param("id") id := ctx.Param("id")
res := a.rtspServer.onAPISessionsKick(rtspServerAPISessionsKickReq{id: id}) res := a.rtspServer.apiSessionsKick(rtspServerAPISessionsKickReq{id: id})
if res.err != nil { if res.err != nil {
ctx.AbortWithStatus(http.StatusNotFound) ctx.AbortWithStatus(http.StatusNotFound)
return return
@@ -465,7 +465,7 @@ func (a *api) onRTSPSessionsKick(ctx *gin.Context) {
} }
func (a *api) onRTSPSSessionsList(ctx *gin.Context) { func (a *api) onRTSPSSessionsList(ctx *gin.Context) {
res := a.rtspsServer.onAPISessionsList(rtspServerAPISessionsListReq{}) res := a.rtspsServer.apiSessionsList(rtspServerAPISessionsListReq{})
if res.err != nil { if res.err != nil {
ctx.AbortWithStatus(http.StatusInternalServerError) ctx.AbortWithStatus(http.StatusInternalServerError)
return return
@@ -477,7 +477,7 @@ func (a *api) onRTSPSSessionsList(ctx *gin.Context) {
func (a *api) onRTSPSSessionsKick(ctx *gin.Context) { func (a *api) onRTSPSSessionsKick(ctx *gin.Context) {
id := ctx.Param("id") id := ctx.Param("id")
res := a.rtspsServer.onAPISessionsKick(rtspServerAPISessionsKickReq{id: id}) res := a.rtspsServer.apiSessionsKick(rtspServerAPISessionsKickReq{id: id})
if res.err != nil { if res.err != nil {
ctx.AbortWithStatus(http.StatusNotFound) ctx.AbortWithStatus(http.StatusNotFound)
return return
@@ -487,7 +487,7 @@ func (a *api) onRTSPSSessionsKick(ctx *gin.Context) {
} }
func (a *api) onRTMPConnsList(ctx *gin.Context) { func (a *api) onRTMPConnsList(ctx *gin.Context) {
res := a.rtmpServer.onAPIConnsList(rtmpServerAPIConnsListReq{}) res := a.rtmpServer.apiConnsList(rtmpServerAPIConnsListReq{})
if res.err != nil { if res.err != nil {
ctx.AbortWithStatus(http.StatusInternalServerError) ctx.AbortWithStatus(http.StatusInternalServerError)
return return
@@ -499,7 +499,7 @@ func (a *api) onRTMPConnsList(ctx *gin.Context) {
func (a *api) onRTMPConnsKick(ctx *gin.Context) { func (a *api) onRTMPConnsKick(ctx *gin.Context) {
id := ctx.Param("id") id := ctx.Param("id")
res := a.rtmpServer.onAPIConnsKick(rtmpServerAPIConnsKickReq{id: id}) res := a.rtmpServer.apiConnsKick(rtmpServerAPIConnsKickReq{id: id})
if res.err != nil { if res.err != nil {
ctx.AbortWithStatus(http.StatusNotFound) ctx.AbortWithStatus(http.StatusNotFound)
return return
@@ -509,7 +509,7 @@ func (a *api) onRTMPConnsKick(ctx *gin.Context) {
} }
func (a *api) onHLSMuxersList(ctx *gin.Context) { func (a *api) onHLSMuxersList(ctx *gin.Context) {
res := a.hlsServer.onAPIHLSMuxersList(hlsServerAPIMuxersListReq{}) res := a.hlsServer.apiHLSMuxersList(hlsServerAPIMuxersListReq{})
if res.err != nil { if res.err != nil {
ctx.AbortWithStatus(http.StatusInternalServerError) ctx.AbortWithStatus(http.StatusInternalServerError)
return return
@@ -518,8 +518,8 @@ func (a *api) onHLSMuxersList(ctx *gin.Context) {
ctx.JSON(http.StatusOK, res.data) ctx.JSON(http.StatusOK, res.data)
} }
// onConfReload is called by core. // confReload is called by core.
func (a *api) onConfReload(conf *conf.Conf) { func (a *api) confReload(conf *conf.Conf) {
a.mutex.Lock() a.mutex.Lock()
defer a.mutex.Unlock() defer a.mutex.Unlock()
a.conf = conf a.conf = conf

View File

@@ -40,7 +40,7 @@ type Core struct {
confWatcher *confwatcher.ConfWatcher confWatcher *confwatcher.ConfWatcher
// in // in
apiConfigSet chan *conf.Conf chAPIConfigSet chan *conf.Conf
// out // out
done chan struct{} done chan struct{}
@@ -72,11 +72,11 @@ func New(args []string) (*Core, bool) {
ctx, ctxCancel := context.WithCancel(context.Background()) ctx, ctxCancel := context.WithCancel(context.Background())
p := &Core{ p := &Core{
ctx: ctx, ctx: ctx,
ctxCancel: ctxCancel, ctxCancel: ctxCancel,
confPath: *argConfPath, confPath: *argConfPath,
apiConfigSet: make(chan *conf.Conf), chAPIConfigSet: make(chan *conf.Conf),
done: make(chan struct{}), done: make(chan struct{}),
} }
var err error var err error
@@ -148,7 +148,7 @@ outer:
break outer break outer
} }
case newConf := <-p.apiConfigSet: case newConf := <-p.chAPIConfigSet:
p.Log(logger.Info, "reloading configuration (API request)") p.Log(logger.Info, "reloading configuration (API request)")
err := p.reloadConf(newConf, true) err := p.reloadConf(newConf, true)
@@ -411,7 +411,7 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
closeMetrics { closeMetrics {
closePathManager = true closePathManager = true
} else if !reflect.DeepEqual(newConf.Paths, p.conf.Paths) { } else if !reflect.DeepEqual(newConf.Paths, p.conf.Paths) {
p.pathManager.onConfReload(newConf.Paths) p.pathManager.confReload(newConf.Paths)
} }
closeRTSPServer := false closeRTSPServer := false
@@ -520,7 +520,7 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
p.api.close() p.api.close()
p.api = nil p.api = nil
} else if !calledByAPI { // avoid a loop } else if !calledByAPI { // avoid a loop
p.api.onConfReload(newConf) p.api.confReload(newConf)
} }
} }
@@ -577,10 +577,10 @@ func (p *Core) reloadConf(newConf *conf.Conf, calledByAPI bool) error {
return p.createResources(false) return p.createResources(false)
} }
// onAPIConfigSet is called by api. // apiConfigSet is called by api.
func (p *Core) onAPIConfigSet(conf *conf.Conf) { func (p *Core) apiConfigSet(conf *conf.Conf) {
select { select {
case p.apiConfigSet <- conf: case p.chAPIConfigSet <- conf:
case <-p.ctx.Done(): case <-p.ctx.Done():
} }
} }

View File

@@ -219,7 +219,7 @@ func TestCorePathAutoDeletion(t *testing.T) {
} }
}() }()
res := p.pathManager.onAPIPathsList(pathAPIPathsListReq{}) res := p.pathManager.apiPathsList(pathAPIPathsListReq{})
require.NoError(t, res.err) require.NoError(t, res.err)
require.Equal(t, 0, len(res.data.Items)) require.Equal(t, 0, len(res.data.Items))

View File

@@ -102,12 +102,12 @@ type hlsMuxerRequest struct {
} }
type hlsMuxerPathManager interface { type hlsMuxerPathManager interface {
onReaderSetupPlay(req pathReaderSetupPlayReq) pathReaderSetupPlayRes readerSetupPlay(req pathReaderSetupPlayReq) pathReaderSetupPlayRes
} }
type hlsMuxerParent interface { type hlsMuxerParent interface {
log(logger.Level, string, ...interface{}) log(logger.Level, string, ...interface{})
onMuxerClose(*hlsMuxer) muxerClose(*hlsMuxer)
} }
type hlsMuxer struct { type hlsMuxer struct {
@@ -134,8 +134,8 @@ type hlsMuxer struct {
requests []*hlsMuxerRequest requests []*hlsMuxerRequest
// in // in
request chan *hlsMuxerRequest chRequest chan *hlsMuxerRequest
hlsServerAPIMuxersList chan hlsServerAPIMuxersListSubReq chAPIHLSMuxersList chan hlsServerAPIMuxersListSubReq
} }
func newHLSMuxer( func newHLSMuxer(
@@ -177,8 +177,8 @@ func newHLSMuxer(
v := time.Now().Unix() v := time.Now().Unix()
return &v return &v
}(), }(),
request: make(chan *hlsMuxerRequest), chRequest: make(chan *hlsMuxerRequest),
hlsServerAPIMuxersList: make(chan hlsServerAPIMuxersListSubReq), chAPIHLSMuxersList: make(chan hlsServerAPIMuxersListSubReq),
} }
if req != nil { if req != nil {
@@ -231,14 +231,14 @@ func (m *hlsMuxer) run() {
<-innerErr <-innerErr
return errors.New("terminated") return errors.New("terminated")
case req := <-m.request: case req := <-m.chRequest:
if isReady { if isReady {
req.res <- m.handleRequest(req) req.res <- m.handleRequest(req)
} else { } else {
m.requests = append(m.requests, req) m.requests = append(m.requests, req)
} }
case req := <-m.hlsServerAPIMuxersList: case req := <-m.chAPIHLSMuxersList:
req.data.Items[m.name] = hlsServerAPIMuxersListItem{ req.data.Items[m.name] = hlsServerAPIMuxersListItem{
LastRequest: time.Unix(atomic.LoadInt64(m.lastRequestTime), 0).String(), LastRequest: time.Unix(atomic.LoadInt64(m.lastRequestTime), 0).String(),
} }
@@ -266,13 +266,13 @@ func (m *hlsMuxer) run() {
} }
} }
m.parent.onMuxerClose(m) m.parent.muxerClose(m)
m.log(logger.Info, "destroyed (%v)", err) m.log(logger.Info, "destroyed (%v)", err)
} }
func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) error { func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) error {
res := m.pathManager.onReaderSetupPlay(pathReaderSetupPlayReq{ res := m.pathManager.readerSetupPlay(pathReaderSetupPlayReq{
author: m, author: m,
pathName: m.pathName, pathName: m.pathName,
authenticate: nil, authenticate: nil,
@@ -284,7 +284,7 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{})
m.path = res.path m.path = res.path
defer func() { defer func() {
m.path.onReaderRemove(pathReaderRemoveReq{author: m}) m.path.readerRemove(pathReaderRemoveReq{author: m})
}() }()
var videoTrack *gortsplib.TrackH264 var videoTrack *gortsplib.TrackH264
@@ -343,7 +343,7 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{})
m.ringBuffer, _ = ringbuffer.New(uint64(m.readBufferCount)) m.ringBuffer, _ = ringbuffer.New(uint64(m.readBufferCount))
m.path.onReaderPlay(pathReaderPlayReq{author: m}) m.path.readerPlay(pathReaderPlayReq{author: m})
writerDone := make(chan error) writerDone := make(chan error)
go func() { go func() {
@@ -510,10 +510,10 @@ func (m *hlsMuxer) authenticate(ctx *gin.Context) error {
return nil return nil
} }
// onRequest is called by hlsserver.Server (forwarded from ServeHTTP). // request is called by hlsserver.Server (forwarded from ServeHTTP).
func (m *hlsMuxer) onRequest(req *hlsMuxerRequest) { func (m *hlsMuxer) request(req *hlsMuxerRequest) {
select { select {
case m.request <- req: case m.chRequest <- req:
case <-m.ctx.Done(): case <-m.ctx.Done():
req.res <- func() *hls.MuxerFileResponse { req.res <- func() *hls.MuxerFileResponse {
return &hls.MuxerFileResponse{Status: http.StatusInternalServerError} return &hls.MuxerFileResponse{Status: http.StatusInternalServerError}
@@ -521,6 +521,17 @@ func (m *hlsMuxer) onRequest(req *hlsMuxerRequest) {
} }
} }
// apiHLSMuxersList is called by api.
func (m *hlsMuxer) apiHLSMuxersList(req hlsServerAPIMuxersListSubReq) {
req.res = make(chan struct{})
select {
case m.chAPIHLSMuxersList <- req:
<-req.res
case <-m.ctx.Done():
}
}
// onReaderAccepted implements reader. // onReaderAccepted implements reader.
func (m *hlsMuxer) onReaderAccepted() { func (m *hlsMuxer) onReaderAccepted() {
m.log(logger.Info, "is converting into HLS") m.log(logger.Info, "is converting into HLS")
@@ -531,20 +542,9 @@ func (m *hlsMuxer) onReaderData(data *data) {
m.ringBuffer.Push(data) m.ringBuffer.Push(data)
} }
// onReaderAPIDescribe implements reader. // apiReaderDescribe implements reader.
func (m *hlsMuxer) onReaderAPIDescribe() interface{} { func (m *hlsMuxer) apiReaderDescribe() interface{} {
return struct { return struct {
Type string `json:"type"` Type string `json:"type"`
}{"hlsMuxer"} }{"hlsMuxer"}
} }
// onAPIHLSMuxersList is called by api.
func (m *hlsMuxer) onAPIHLSMuxersList(req hlsServerAPIMuxersListSubReq) {
req.res = make(chan struct{})
select {
case m.hlsServerAPIMuxersList <- req:
<-req.res
case <-m.ctx.Done():
}
}

View File

@@ -76,11 +76,11 @@ type hlsServer struct {
muxers map[string]*hlsMuxer muxers map[string]*hlsMuxer
// in // in
pathSourceReady chan *path chPathSourceReady chan *path
pathSourceNotReady chan *path chPathSourceNotReady chan *path
request chan *hlsMuxerRequest request chan *hlsMuxerRequest
muxerClose chan *hlsMuxer chMuxerClose chan *hlsMuxer
apiMuxersList chan hlsServerAPIMuxersListReq chAPIMuxerList chan hlsServerAPIMuxersListReq
} }
func newHLSServer( func newHLSServer(
@@ -142,19 +142,19 @@ func newHLSServer(
ln: ln, ln: ln,
tlsConfig: tlsConfig, tlsConfig: tlsConfig,
muxers: make(map[string]*hlsMuxer), muxers: make(map[string]*hlsMuxer),
pathSourceReady: make(chan *path), chPathSourceReady: make(chan *path),
pathSourceNotReady: make(chan *path), chPathSourceNotReady: make(chan *path),
request: make(chan *hlsMuxerRequest), request: make(chan *hlsMuxerRequest),
muxerClose: make(chan *hlsMuxer), chMuxerClose: make(chan *hlsMuxer),
apiMuxersList: make(chan hlsServerAPIMuxersListReq), chAPIMuxerList: make(chan hlsServerAPIMuxersListReq),
} }
s.log(logger.Info, "listener opened on "+address) s.log(logger.Info, "listener opened on "+address)
s.pathManager.onHLSServerSet(s) s.pathManager.hlsServerSet(s)
if s.metrics != nil { if s.metrics != nil {
s.metrics.onHLSServerSet(s) s.metrics.hlsServerSet(s)
} }
s.wg.Add(1) s.wg.Add(1)
@@ -201,12 +201,12 @@ func (s *hlsServer) run() {
outer: outer:
for { for {
select { select {
case pa := <-s.pathSourceReady: case pa := <-s.chPathSourceReady:
if s.hlsAlwaysRemux { if s.hlsAlwaysRemux {
s.findOrCreateMuxer(pa.Name(), "", nil) s.findOrCreateMuxer(pa.Name(), "", nil)
} }
case pa := <-s.pathSourceNotReady: case pa := <-s.chPathSourceNotReady:
if s.hlsAlwaysRemux { if s.hlsAlwaysRemux {
c, ok := s.muxers[pa.Name()] c, ok := s.muxers[pa.Name()]
if ok { if ok {
@@ -218,7 +218,7 @@ outer:
case req := <-s.request: case req := <-s.request:
s.findOrCreateMuxer(req.dir, req.ctx.ClientIP(), req) s.findOrCreateMuxer(req.dir, req.ctx.ClientIP(), req)
case c := <-s.muxerClose: case c := <-s.chMuxerClose:
if c2, ok := s.muxers[c.PathName()]; !ok || c2 != c { if c2, ok := s.muxers[c.PathName()]; !ok || c2 != c {
continue continue
} }
@@ -228,7 +228,7 @@ outer:
s.findOrCreateMuxer(c.PathName(), "", nil) s.findOrCreateMuxer(c.PathName(), "", nil)
} }
case req := <-s.apiMuxersList: case req := <-s.chAPIMuxerList:
muxers := make(map[string]*hlsMuxer) muxers := make(map[string]*hlsMuxer)
for name, m := range s.muxers { for name, m := range s.muxers {
@@ -248,10 +248,10 @@ outer:
hs.Shutdown(context.Background()) hs.Shutdown(context.Background())
s.pathManager.onHLSServerSet(nil) s.pathManager.hlsServerSet(nil)
if s.metrics != nil { if s.metrics != nil {
s.metrics.onHLSServerSet(nil) s.metrics.hlsServerSet(nil)
} }
} }
@@ -359,40 +359,40 @@ func (s *hlsServer) findOrCreateMuxer(pathName string, remoteAddr string, req *h
s) s)
s.muxers[pathName] = r s.muxers[pathName] = r
} else if req != nil { } else if req != nil {
r.onRequest(req) r.request(req)
} }
return r return r
} }
// onMuxerClose is called by hlsMuxer. // muxerClose is called by hlsMuxer.
func (s *hlsServer) onMuxerClose(c *hlsMuxer) { func (s *hlsServer) muxerClose(c *hlsMuxer) {
select { select {
case s.muxerClose <- c: case s.chMuxerClose <- c:
case <-s.ctx.Done(): case <-s.ctx.Done():
} }
} }
// onPathSourceReady is called by pathManager. // pathSourceReady is called by pathManager.
func (s *hlsServer) onPathSourceReady(pa *path) { func (s *hlsServer) pathSourceReady(pa *path) {
select { select {
case s.pathSourceReady <- pa: case s.chPathSourceReady <- pa:
case <-s.ctx.Done(): case <-s.ctx.Done():
} }
} }
// onPathSourceNotReady is called by pathManager. // pathSourceNotReady is called by pathManager.
func (s *hlsServer) onPathSourceNotReady(pa *path) { func (s *hlsServer) pathSourceNotReady(pa *path) {
select { select {
case s.pathSourceNotReady <- pa: case s.chPathSourceNotReady <- pa:
case <-s.ctx.Done(): case <-s.ctx.Done():
} }
} }
// onAPIHLSMuxersList is called by api. // apiHLSMuxersList is called by api.
func (s *hlsServer) onAPIHLSMuxersList(req hlsServerAPIMuxersListReq) hlsServerAPIMuxersListRes { func (s *hlsServer) apiHLSMuxersList(req hlsServerAPIMuxersListReq) hlsServerAPIMuxersListRes {
req.res = make(chan hlsServerAPIMuxersListRes) req.res = make(chan hlsServerAPIMuxersListRes)
select { select {
case s.apiMuxersList <- req: case s.chAPIMuxerList <- req:
res := <-req.res res := <-req.res
res.data = &hlsServerAPIMuxersListData{ res.data = &hlsServerAPIMuxersListData{
@@ -400,7 +400,7 @@ func (s *hlsServer) onAPIHLSMuxersList(req hlsServerAPIMuxersListReq) hlsServerA
} }
for _, pa := range res.muxers { for _, pa := range res.muxers {
pa.onAPIHLSMuxersList(hlsServerAPIMuxersListSubReq{data: res.data}) pa.apiHLSMuxersList(hlsServerAPIMuxersListSubReq{data: res.data})
} }
return res return res

View File

@@ -15,8 +15,8 @@ import (
type hlsSourceParent interface { type hlsSourceParent interface {
log(logger.Level, string, ...interface{}) log(logger.Level, string, ...interface{})
onSourceStaticImplSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes sourceStaticImplSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes
onSourceStaticImplSetNotReady(req pathSourceStaticSetNotReadyReq) sourceStaticImplSetNotReady(req pathSourceStaticSetNotReadyReq)
} }
type hlsSource struct { type hlsSource struct {
@@ -51,7 +51,7 @@ func (s *hlsSource) run(ctx context.Context) error {
defer func() { defer func() {
if stream != nil { if stream != nil {
s.parent.onSourceStaticImplSetNotReady(pathSourceStaticSetNotReadyReq{}) s.parent.sourceStaticImplSetNotReady(pathSourceStaticSetNotReadyReq{})
} }
}() }()
@@ -78,7 +78,7 @@ func (s *hlsSource) run(ctx context.Context) error {
tracks = append(tracks, audioTrack) tracks = append(tracks, audioTrack)
} }
res := s.parent.onSourceStaticImplSetReady(pathSourceStaticSetReadyReq{tracks: tracks}) res := s.parent.sourceStaticImplSetReady(pathSourceStaticSetReadyReq{tracks: tracks})
if res.err != nil { if res.err != nil {
return res.err return res.err
} }
@@ -161,8 +161,8 @@ func (s *hlsSource) run(ctx context.Context) error {
} }
} }
// onSourceAPIDescribe implements sourceStaticImpl. // apiSourceDescribe implements sourceStaticImpl.
func (*hlsSource) onSourceAPIDescribe() interface{} { func (*hlsSource) apiSourceDescribe() interface{} {
return struct { return struct {
Type string `json:"type"` Type string `json:"type"`
}{"hlsSource"} }{"hlsSource"}

View File

@@ -18,19 +18,19 @@ func metric(key string, value int64) string {
} }
type metricsPathManager interface { type metricsPathManager interface {
onAPIPathsList(req pathAPIPathsListReq) pathAPIPathsListRes apiPathsList(req pathAPIPathsListReq) pathAPIPathsListRes
} }
type metricsRTSPServer interface { type metricsRTSPServer interface {
onAPISessionsList(req rtspServerAPISessionsListReq) rtspServerAPISessionsListRes apiSessionsList(req rtspServerAPISessionsListReq) rtspServerAPISessionsListRes
} }
type metricsRTMPServer interface { type metricsRTMPServer interface {
onAPIConnsList(req rtmpServerAPIConnsListReq) rtmpServerAPIConnsListRes apiConnsList(req rtmpServerAPIConnsListReq) rtmpServerAPIConnsListRes
} }
type metricsHLSServer interface { type metricsHLSServer interface {
onAPIHLSMuxersList(req hlsServerAPIMuxersListReq) hlsServerAPIMuxersListRes apiHLSMuxersList(req hlsServerAPIMuxersListReq) hlsServerAPIMuxersListRes
} }
type metricsParent interface { type metricsParent interface {
@@ -96,7 +96,7 @@ func (m *metrics) run() {
func (m *metrics) onMetrics(ctx *gin.Context) { func (m *metrics) onMetrics(ctx *gin.Context) {
out := "" out := ""
res := m.pathManager.onAPIPathsList(pathAPIPathsListReq{}) res := m.pathManager.apiPathsList(pathAPIPathsListReq{})
if res.err == nil { if res.err == nil {
for name, p := range res.data.Items { for name, p := range res.data.Items {
if p.SourceReady { if p.SourceReady {
@@ -108,7 +108,7 @@ func (m *metrics) onMetrics(ctx *gin.Context) {
} }
if !interfaceIsEmpty(m.rtspServer) { if !interfaceIsEmpty(m.rtspServer) {
res := m.rtspServer.onAPISessionsList(rtspServerAPISessionsListReq{}) res := m.rtspServer.apiSessionsList(rtspServerAPISessionsListReq{})
if res.err == nil { if res.err == nil {
idleCount := int64(0) idleCount := int64(0)
readCount := int64(0) readCount := int64(0)
@@ -135,7 +135,7 @@ func (m *metrics) onMetrics(ctx *gin.Context) {
} }
if !interfaceIsEmpty(m.rtspsServer) { if !interfaceIsEmpty(m.rtspsServer) {
res := m.rtspsServer.onAPISessionsList(rtspServerAPISessionsListReq{}) res := m.rtspsServer.apiSessionsList(rtspServerAPISessionsListReq{})
if res.err == nil { if res.err == nil {
idleCount := int64(0) idleCount := int64(0)
readCount := int64(0) readCount := int64(0)
@@ -162,7 +162,7 @@ func (m *metrics) onMetrics(ctx *gin.Context) {
} }
if !interfaceIsEmpty(m.rtmpServer) { if !interfaceIsEmpty(m.rtmpServer) {
res := m.rtmpServer.onAPIConnsList(rtmpServerAPIConnsListReq{}) res := m.rtmpServer.apiConnsList(rtmpServerAPIConnsListReq{})
if res.err == nil { if res.err == nil {
idleCount := int64(0) idleCount := int64(0)
readCount := int64(0) readCount := int64(0)
@@ -189,7 +189,7 @@ func (m *metrics) onMetrics(ctx *gin.Context) {
} }
if !interfaceIsEmpty(m.hlsServer) { if !interfaceIsEmpty(m.hlsServer) {
res := m.hlsServer.onAPIHLSMuxersList(hlsServerAPIMuxersListReq{}) res := m.hlsServer.apiHLSMuxersList(hlsServerAPIMuxersListReq{})
if res.err == nil { if res.err == nil {
for name := range res.data.Items { for name := range res.data.Items {
out += metric("hls_muxers{name=\""+name+"\"}", 1) out += metric("hls_muxers{name=\""+name+"\"}", 1)
@@ -201,36 +201,36 @@ func (m *metrics) onMetrics(ctx *gin.Context) {
io.WriteString(ctx.Writer, out) io.WriteString(ctx.Writer, out)
} }
// onPathManagerSet is called by pathManager. // pathManagerSet is called by pathManager.
func (m *metrics) onPathManagerSet(s metricsPathManager) { func (m *metrics) pathManagerSet(s metricsPathManager) {
m.mutex.Lock() m.mutex.Lock()
defer m.mutex.Unlock() defer m.mutex.Unlock()
m.pathManager = s m.pathManager = s
} }
// onRTSPServer is called by rtspServer (plain). // rtspServerSet is called by rtspServer (plain).
func (m *metrics) onRTSPServerSet(s metricsRTSPServer) { func (m *metrics) rtspServerSet(s metricsRTSPServer) {
m.mutex.Lock() m.mutex.Lock()
defer m.mutex.Unlock() defer m.mutex.Unlock()
m.rtspServer = s m.rtspServer = s
} }
// onRTSPServer is called by rtspServer (plain). // rtspsServerSet is called by rtspServer (tls).
func (m *metrics) onRTSPSServerSet(s metricsRTSPServer) { func (m *metrics) rtspsServerSet(s metricsRTSPServer) {
m.mutex.Lock() m.mutex.Lock()
defer m.mutex.Unlock() defer m.mutex.Unlock()
m.rtspsServer = s m.rtspsServer = s
} }
// onRTMPServerSet is called by rtmpServer. // rtmpServerSet is called by rtmpServer.
func (m *metrics) onRTMPServerSet(s metricsRTMPServer) { func (m *metrics) rtmpServerSet(s metricsRTMPServer) {
m.mutex.Lock() m.mutex.Lock()
defer m.mutex.Unlock() defer m.mutex.Unlock()
m.rtmpServer = s m.rtmpServer = s
} }
// onHLSServerSet is called by hlsServer. // hlsServerSet is called by hlsServer.
func (m *metrics) onHLSServerSet(s metricsHLSServer) { func (m *metrics) hlsServerSet(s metricsHLSServer) {
m.mutex.Lock() m.mutex.Lock()
defer m.mutex.Unlock() defer m.mutex.Unlock()
m.hlsServer = s m.hlsServer = s

View File

@@ -61,8 +61,8 @@ func (pathErrAuthCritical) Error() string {
type pathParent interface { type pathParent interface {
log(logger.Level, string, ...interface{}) log(logger.Level, string, ...interface{})
onPathSourceReady(*path) pathSourceReady(*path)
onPathSourceNotReady(*path) pathSourceNotReady(*path)
onPathClose(*path) onPathClose(*path)
} }
@@ -221,7 +221,7 @@ type path struct {
sourceReady bool sourceReady bool
stream *stream stream *stream
readers map[reader]pathReaderState readers map[reader]pathReaderState
describeRequestsOnHold []pathDescribeReq chDescribeRequestsOnHold []pathDescribeReq
setupPlayRequestsOnHold []pathReaderSetupPlayReq setupPlayRequestsOnHold []pathReaderSetupPlayReq
onDemandCmd *externalcmd.Cmd onDemandCmd *externalcmd.Cmd
onReadyCmd *externalcmd.Cmd onReadyCmd *externalcmd.Cmd
@@ -233,18 +233,18 @@ type path struct {
onDemandPublisherCloseTimer *time.Timer onDemandPublisherCloseTimer *time.Timer
// in // in
sourceStaticSetReady chan pathSourceStaticSetReadyReq chSourceStaticSetReady chan pathSourceStaticSetReadyReq
sourceStaticSetNotReady chan pathSourceStaticSetNotReadyReq chSourceStaticSetNotReady chan pathSourceStaticSetNotReadyReq
describe chan pathDescribeReq chDescribe chan pathDescribeReq
publisherRemove chan pathPublisherRemoveReq chPublisherRemove chan pathPublisherRemoveReq
publisherAnnounce chan pathPublisherAnnounceReq chPublisherAnnounce chan pathPublisherAnnounceReq
publisherRecord chan pathPublisherRecordReq chPublisherRecord chan pathPublisherRecordReq
publisherPause chan pathPublisherPauseReq chPublisherPause chan pathPublisherPauseReq
readerRemove chan pathReaderRemoveReq chReaderRemove chan pathReaderRemoveReq
readerSetupPlay chan pathReaderSetupPlayReq chReaderSetupPlay chan pathReaderSetupPlayReq
readerPlay chan pathReaderPlayReq chReaderPlay chan pathReaderPlayReq
readerPause chan pathReaderPauseReq chReaderPause chan pathReaderPauseReq
apiPathsList chan pathAPIPathsListSubReq chAPIPathsList chan pathAPIPathsListSubReq
} }
func newPath( func newPath(
@@ -282,18 +282,18 @@ func newPath(
onDemandStaticSourceCloseTimer: newEmptyTimer(), onDemandStaticSourceCloseTimer: newEmptyTimer(),
onDemandPublisherReadyTimer: newEmptyTimer(), onDemandPublisherReadyTimer: newEmptyTimer(),
onDemandPublisherCloseTimer: newEmptyTimer(), onDemandPublisherCloseTimer: newEmptyTimer(),
sourceStaticSetReady: make(chan pathSourceStaticSetReadyReq), chSourceStaticSetReady: make(chan pathSourceStaticSetReadyReq),
sourceStaticSetNotReady: make(chan pathSourceStaticSetNotReadyReq), chSourceStaticSetNotReady: make(chan pathSourceStaticSetNotReadyReq),
describe: make(chan pathDescribeReq), chDescribe: make(chan pathDescribeReq),
publisherRemove: make(chan pathPublisherRemoveReq), chPublisherRemove: make(chan pathPublisherRemoveReq),
publisherAnnounce: make(chan pathPublisherAnnounceReq), chPublisherAnnounce: make(chan pathPublisherAnnounceReq),
publisherRecord: make(chan pathPublisherRecordReq), chPublisherRecord: make(chan pathPublisherRecordReq),
publisherPause: make(chan pathPublisherPauseReq), chPublisherPause: make(chan pathPublisherPauseReq),
readerRemove: make(chan pathReaderRemoveReq), chReaderRemove: make(chan pathReaderRemoveReq),
readerSetupPlay: make(chan pathReaderSetupPlayReq), chReaderSetupPlay: make(chan pathReaderSetupPlayReq),
readerPlay: make(chan pathReaderPlayReq), chReaderPlay: make(chan pathReaderPlayReq),
readerPause: make(chan pathReaderPauseReq), chReaderPause: make(chan pathReaderPauseReq),
apiPathsList: make(chan pathAPIPathsListSubReq), chAPIPathsList: make(chan pathAPIPathsListSubReq),
} }
pa.log(logger.Debug, "created") pa.log(logger.Debug, "created")
@@ -382,10 +382,10 @@ func (pa *path) run() {
for { for {
select { select {
case <-pa.onDemandStaticSourceReadyTimer.C: case <-pa.onDemandStaticSourceReadyTimer.C:
for _, req := range pa.describeRequestsOnHold { for _, req := range pa.chDescribeRequestsOnHold {
req.res <- pathDescribeRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)} req.res <- pathDescribeRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)}
} }
pa.describeRequestsOnHold = nil pa.chDescribeRequestsOnHold = nil
for _, req := range pa.setupPlayRequestsOnHold { for _, req := range pa.setupPlayRequestsOnHold {
req.res <- pathReaderSetupPlayRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)} req.res <- pathReaderSetupPlayRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)}
@@ -407,10 +407,10 @@ func (pa *path) run() {
} }
case <-pa.onDemandPublisherReadyTimer.C: case <-pa.onDemandPublisherReadyTimer.C:
for _, req := range pa.describeRequestsOnHold { for _, req := range pa.chDescribeRequestsOnHold {
req.res <- pathDescribeRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)} req.res <- pathDescribeRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)}
} }
pa.describeRequestsOnHold = nil pa.chDescribeRequestsOnHold = nil
for _, req := range pa.setupPlayRequestsOnHold { for _, req := range pa.setupPlayRequestsOnHold {
req.res <- pathReaderSetupPlayRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)} req.res <- pathReaderSetupPlayRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)}
@@ -430,7 +430,7 @@ func (pa *path) run() {
return fmt.Errorf("not in use") return fmt.Errorf("not in use")
} }
case req := <-pa.sourceStaticSetReady: case req := <-pa.chSourceStaticSetReady:
pa.sourceSetReady(req.tracks) pa.sourceSetReady(req.tracks)
if pa.hasOnDemandStaticSource() { if pa.hasOnDemandStaticSource() {
@@ -439,12 +439,12 @@ func (pa *path) run() {
pa.onDemandStaticSourceScheduleClose() pa.onDemandStaticSourceScheduleClose()
for _, req := range pa.describeRequestsOnHold { for _, req := range pa.chDescribeRequestsOnHold {
req.res <- pathDescribeRes{ req.res <- pathDescribeRes{
stream: pa.stream, stream: pa.stream,
} }
} }
pa.describeRequestsOnHold = nil pa.chDescribeRequestsOnHold = nil
for _, req := range pa.setupPlayRequestsOnHold { for _, req := range pa.setupPlayRequestsOnHold {
pa.handleReaderSetupPlayPost(req) pa.handleReaderSetupPlayPost(req)
@@ -454,7 +454,7 @@ func (pa *path) run() {
req.res <- pathSourceStaticSetReadyRes{stream: pa.stream} req.res <- pathSourceStaticSetReadyRes{stream: pa.stream}
case req := <-pa.sourceStaticSetNotReady: case req := <-pa.chSourceStaticSetNotReady:
pa.sourceSetNotReady() pa.sourceSetNotReady()
if pa.hasOnDemandStaticSource() && pa.onDemandStaticSourceState != pathOnDemandStateInitial { if pa.hasOnDemandStaticSource() && pa.onDemandStaticSourceState != pathOnDemandStateInitial {
@@ -467,50 +467,50 @@ func (pa *path) run() {
return fmt.Errorf("not in use") return fmt.Errorf("not in use")
} }
case req := <-pa.describe: case req := <-pa.chDescribe:
pa.handleDescribe(req) pa.handleDescribe(req)
if pa.shouldClose() { if pa.shouldClose() {
return fmt.Errorf("not in use") return fmt.Errorf("not in use")
} }
case req := <-pa.publisherRemove: case req := <-pa.chPublisherRemove:
pa.handlePublisherRemove(req) pa.handlePublisherRemove(req)
if pa.shouldClose() { if pa.shouldClose() {
return fmt.Errorf("not in use") return fmt.Errorf("not in use")
} }
case req := <-pa.publisherAnnounce: case req := <-pa.chPublisherAnnounce:
pa.handlePublisherAnnounce(req) pa.handlePublisherAnnounce(req)
case req := <-pa.publisherRecord: case req := <-pa.chPublisherRecord:
pa.handlePublisherRecord(req) pa.handlePublisherRecord(req)
case req := <-pa.publisherPause: case req := <-pa.chPublisherPause:
pa.handlePublisherPause(req) pa.handlePublisherPause(req)
if pa.shouldClose() { if pa.shouldClose() {
return fmt.Errorf("not in use") return fmt.Errorf("not in use")
} }
case req := <-pa.readerRemove: case req := <-pa.chReaderRemove:
pa.handleReaderRemove(req) pa.handleReaderRemove(req)
case req := <-pa.readerSetupPlay: case req := <-pa.chReaderSetupPlay:
pa.handleReaderSetupPlay(req) pa.handleReaderSetupPlay(req)
if pa.shouldClose() { if pa.shouldClose() {
return fmt.Errorf("not in use") return fmt.Errorf("not in use")
} }
case req := <-pa.readerPlay: case req := <-pa.chReaderPlay:
pa.handleReaderPlay(req) pa.handleReaderPlay(req)
case req := <-pa.readerPause: case req := <-pa.chReaderPause:
pa.handleReaderPause(req) pa.handleReaderPause(req)
case req := <-pa.apiPathsList: case req := <-pa.chAPIPathsList:
pa.handleAPIPathsList(req) pa.handleAPIPathsList(req)
case <-pa.ctx.Done(): case <-pa.ctx.Done():
@@ -531,7 +531,7 @@ func (pa *path) run() {
pa.log(logger.Info, "runOnInit command stopped") pa.log(logger.Info, "runOnInit command stopped")
} }
for _, req := range pa.describeRequestsOnHold { for _, req := range pa.chDescribeRequestsOnHold {
req.res <- pathDescribeRes{err: fmt.Errorf("terminated")} req.res <- pathDescribeRes{err: fmt.Errorf("terminated")}
} }
@@ -565,7 +565,7 @@ func (pa *path) shouldClose() bool {
return pa.conf.Regexp != nil && return pa.conf.Regexp != nil &&
pa.source == nil && pa.source == nil &&
len(pa.readers) == 0 && len(pa.readers) == 0 &&
len(pa.describeRequestsOnHold) == 0 && len(pa.chDescribeRequestsOnHold) == 0 &&
len(pa.setupPlayRequestsOnHold) == 0 len(pa.setupPlayRequestsOnHold) == 0
} }
@@ -673,11 +673,11 @@ func (pa *path) sourceSetReady(tracks gortsplib.Tracks) {
}) })
} }
pa.parent.onPathSourceReady(pa) pa.parent.pathSourceReady(pa)
} }
func (pa *path) sourceSetNotReady() { func (pa *path) sourceSetNotReady() {
pa.parent.onPathSourceNotReady(pa) pa.parent.pathSourceNotReady(pa)
for r := range pa.readers { for r := range pa.readers {
pa.doReaderRemove(r) pa.doReaderRemove(r)
@@ -739,7 +739,7 @@ func (pa *path) handleDescribe(req pathDescribeReq) {
if pa.onDemandStaticSourceState == pathOnDemandStateInitial { if pa.onDemandStaticSourceState == pathOnDemandStateInitial {
pa.onDemandStaticSourceStart() pa.onDemandStaticSourceStart()
} }
pa.describeRequestsOnHold = append(pa.describeRequestsOnHold, req) pa.chDescribeRequestsOnHold = append(pa.chDescribeRequestsOnHold, req)
return return
} }
@@ -747,7 +747,7 @@ func (pa *path) handleDescribe(req pathDescribeReq) {
if pa.onDemandPublisherState == pathOnDemandStateInitial { if pa.onDemandPublisherState == pathOnDemandStateInitial {
pa.onDemandPublisherStart() pa.onDemandPublisherStart()
} }
pa.describeRequestsOnHold = append(pa.describeRequestsOnHold, req) pa.chDescribeRequestsOnHold = append(pa.chDescribeRequestsOnHold, req)
return return
} }
@@ -818,12 +818,12 @@ func (pa *path) handlePublisherRecord(req pathPublisherRecordReq) {
pa.onDemandPublisherScheduleClose() pa.onDemandPublisherScheduleClose()
for _, req := range pa.describeRequestsOnHold { for _, req := range pa.chDescribeRequestsOnHold {
req.res <- pathDescribeRes{ req.res <- pathDescribeRes{
stream: pa.stream, stream: pa.stream,
} }
} }
pa.describeRequestsOnHold = nil pa.chDescribeRequestsOnHold = nil
for _, req := range pa.setupPlayRequestsOnHold { for _, req := range pa.setupPlayRequestsOnHold {
pa.handleReaderSetupPlayPost(req) pa.handleReaderSetupPlayPost(req)
@@ -938,13 +938,13 @@ func (pa *path) handleAPIPathsList(req pathAPIPathsListSubReq) {
if pa.source == nil { if pa.source == nil {
return nil return nil
} }
return pa.source.onSourceAPIDescribe() return pa.source.apiSourceDescribe()
}(), }(),
SourceReady: pa.sourceReady, SourceReady: pa.sourceReady,
Readers: func() []interface{} { Readers: func() []interface{} {
ret := []interface{}{} ret := []interface{}{}
for r := range pa.readers { for r := range pa.readers {
ret = append(ret, r.onReaderAPIDescribe()) ret = append(ret, r.apiReaderDescribe())
} }
return ret return ret
}(), }(),
@@ -952,10 +952,10 @@ func (pa *path) handleAPIPathsList(req pathAPIPathsListSubReq) {
close(req.res) close(req.res)
} }
// onSourceStaticSetReady is called by sourceStatic. // sourceStaticSetReady is called by sourceStatic.
func (pa *path) onSourceStaticSetReady(sourceStaticCtx context.Context, req pathSourceStaticSetReadyReq) { func (pa *path) sourceStaticSetReady(sourceStaticCtx context.Context, req pathSourceStaticSetReadyReq) {
select { select {
case pa.sourceStaticSetReady <- req: case pa.chSourceStaticSetReady <- req:
case <-pa.ctx.Done(): case <-pa.ctx.Done():
req.res <- pathSourceStaticSetReadyRes{err: fmt.Errorf("terminated")} req.res <- pathSourceStaticSetReadyRes{err: fmt.Errorf("terminated")}
@@ -968,10 +968,10 @@ func (pa *path) onSourceStaticSetReady(sourceStaticCtx context.Context, req path
} }
} }
// onSourceStaticSetNotReady is called by sourceStatic. // sourceStaticSetNotReady is called by sourceStatic.
func (pa *path) onSourceStaticSetNotReady(sourceStaticCtx context.Context, req pathSourceStaticSetNotReadyReq) { func (pa *path) sourceStaticSetNotReady(sourceStaticCtx context.Context, req pathSourceStaticSetNotReadyReq) {
select { select {
case pa.sourceStaticSetNotReady <- req: case pa.chSourceStaticSetNotReady <- req:
case <-pa.ctx.Done(): case <-pa.ctx.Done():
close(req.res) close(req.res)
@@ -984,102 +984,102 @@ func (pa *path) onSourceStaticSetNotReady(sourceStaticCtx context.Context, req p
} }
} }
// onDescribe is called by a reader or publisher through pathManager. // describe is called by a reader or publisher through pathManager.
func (pa *path) onDescribe(req pathDescribeReq) pathDescribeRes { func (pa *path) describe(req pathDescribeReq) pathDescribeRes {
select { select {
case pa.describe <- req: case pa.chDescribe <- req:
return <-req.res return <-req.res
case <-pa.ctx.Done(): case <-pa.ctx.Done():
return pathDescribeRes{err: fmt.Errorf("terminated")} return pathDescribeRes{err: fmt.Errorf("terminated")}
} }
} }
// onPublisherRemove is called by a publisher. // publisherRemove is called by a publisher.
func (pa *path) onPublisherRemove(req pathPublisherRemoveReq) { func (pa *path) publisherRemove(req pathPublisherRemoveReq) {
req.res = make(chan struct{}) req.res = make(chan struct{})
select { select {
case pa.publisherRemove <- req: case pa.chPublisherRemove <- req:
<-req.res <-req.res
case <-pa.ctx.Done(): case <-pa.ctx.Done():
} }
} }
// onPublisherAnnounce is called by a publisher through pathManager. // publisherAnnounce is called by a publisher through pathManager.
func (pa *path) onPublisherAnnounce(req pathPublisherAnnounceReq) pathPublisherAnnounceRes { func (pa *path) publisherAnnounce(req pathPublisherAnnounceReq) pathPublisherAnnounceRes {
select { select {
case pa.publisherAnnounce <- req: case pa.chPublisherAnnounce <- req:
return <-req.res return <-req.res
case <-pa.ctx.Done(): case <-pa.ctx.Done():
return pathPublisherAnnounceRes{err: fmt.Errorf("terminated")} return pathPublisherAnnounceRes{err: fmt.Errorf("terminated")}
} }
} }
// onPublisherRecord is called by a publisher. // publisherRecord is called by a publisher.
func (pa *path) onPublisherRecord(req pathPublisherRecordReq) pathPublisherRecordRes { func (pa *path) publisherRecord(req pathPublisherRecordReq) pathPublisherRecordRes {
req.res = make(chan pathPublisherRecordRes) req.res = make(chan pathPublisherRecordRes)
select { select {
case pa.publisherRecord <- req: case pa.chPublisherRecord <- req:
return <-req.res return <-req.res
case <-pa.ctx.Done(): case <-pa.ctx.Done():
return pathPublisherRecordRes{err: fmt.Errorf("terminated")} return pathPublisherRecordRes{err: fmt.Errorf("terminated")}
} }
} }
// onPublisherPause is called by a publisher. // publisherPause is called by a publisher.
func (pa *path) onPublisherPause(req pathPublisherPauseReq) { func (pa *path) publisherPause(req pathPublisherPauseReq) {
req.res = make(chan struct{}) req.res = make(chan struct{})
select { select {
case pa.publisherPause <- req: case pa.chPublisherPause <- req:
<-req.res <-req.res
case <-pa.ctx.Done(): case <-pa.ctx.Done():
} }
} }
// onReaderRemove is called by a reader. // readerRemove is called by a reader.
func (pa *path) onReaderRemove(req pathReaderRemoveReq) { func (pa *path) readerRemove(req pathReaderRemoveReq) {
req.res = make(chan struct{}) req.res = make(chan struct{})
select { select {
case pa.readerRemove <- req: case pa.chReaderRemove <- req:
<-req.res <-req.res
case <-pa.ctx.Done(): case <-pa.ctx.Done():
} }
} }
// onReaderSetupPlay is called by a reader through pathManager. // readerSetupPlay is called by a reader through pathManager.
func (pa *path) onReaderSetupPlay(req pathReaderSetupPlayReq) pathReaderSetupPlayRes { func (pa *path) readerSetupPlay(req pathReaderSetupPlayReq) pathReaderSetupPlayRes {
select { select {
case pa.readerSetupPlay <- req: case pa.chReaderSetupPlay <- req:
return <-req.res return <-req.res
case <-pa.ctx.Done(): case <-pa.ctx.Done():
return pathReaderSetupPlayRes{err: fmt.Errorf("terminated")} return pathReaderSetupPlayRes{err: fmt.Errorf("terminated")}
} }
} }
// onReaderPlay is called by a reader. // readerPlay is called by a reader.
func (pa *path) onReaderPlay(req pathReaderPlayReq) { func (pa *path) readerPlay(req pathReaderPlayReq) {
req.res = make(chan struct{}) req.res = make(chan struct{})
select { select {
case pa.readerPlay <- req: case pa.chReaderPlay <- req:
<-req.res <-req.res
case <-pa.ctx.Done(): case <-pa.ctx.Done():
} }
} }
// onReaderPause is called by a reader. // readerPause is called by a reader.
func (pa *path) onReaderPause(req pathReaderPauseReq) { func (pa *path) readerPause(req pathReaderPauseReq) {
req.res = make(chan struct{}) req.res = make(chan struct{})
select { select {
case pa.readerPause <- req: case pa.chReaderPause <- req:
<-req.res <-req.res
case <-pa.ctx.Done(): case <-pa.ctx.Done():
} }
} }
// onAPIPathsList is called by api. // apiPathsList is called by api.
func (pa *path) onAPIPathsList(req pathAPIPathsListSubReq) { func (pa *path) apiPathsList(req pathAPIPathsListSubReq) {
req.res = make(chan struct{}) req.res = make(chan struct{})
select { select {
case pa.apiPathsList <- req: case pa.chAPIPathsList <- req:
<-req.res <-req.res
case <-pa.ctx.Done(): case <-pa.ctx.Done():

View File

@@ -11,8 +11,8 @@ import (
) )
type pathManagerHLSServer interface { type pathManagerHLSServer interface {
onPathSourceReady(*path) pathSourceReady(*path)
onPathSourceNotReady(*path) pathSourceNotReady(*path)
} }
type pathManagerParent interface { type pathManagerParent interface {
@@ -36,15 +36,15 @@ type pathManager struct {
paths map[string]*path paths map[string]*path
// in // in
confReload chan map[string]*conf.PathConf chConfReload chan map[string]*conf.PathConf
pathClose chan *path chPathClose chan *path
pathSourceReady chan *path chPathSourceReady chan *path
pathSourceNotReady chan *path chPathSourceNotReady chan *path
describe chan pathDescribeReq chDescribe chan pathDescribeReq
readerSetupPlay chan pathReaderSetupPlayReq chReaderSetupPlay chan pathReaderSetupPlayReq
publisherAnnounce chan pathPublisherAnnounceReq chPublisherAnnounce chan pathPublisherAnnounceReq
hlsServerSet chan pathManagerHLSServer chHLSServerSet chan pathManagerHLSServer
apiPathsList chan pathAPIPathsListReq chAPIPathsList chan pathAPIPathsListReq
} }
func newPathManager( func newPathManager(
@@ -61,26 +61,26 @@ func newPathManager(
ctx, ctxCancel := context.WithCancel(parentCtx) ctx, ctxCancel := context.WithCancel(parentCtx)
pm := &pathManager{ pm := &pathManager{
rtspAddress: rtspAddress, rtspAddress: rtspAddress,
readTimeout: readTimeout, readTimeout: readTimeout,
writeTimeout: writeTimeout, writeTimeout: writeTimeout,
readBufferCount: readBufferCount, readBufferCount: readBufferCount,
pathConfs: pathConfs, pathConfs: pathConfs,
externalCmdPool: externalCmdPool, externalCmdPool: externalCmdPool,
metrics: metrics, metrics: metrics,
parent: parent, parent: parent,
ctx: ctx, ctx: ctx,
ctxCancel: ctxCancel, ctxCancel: ctxCancel,
paths: make(map[string]*path), paths: make(map[string]*path),
confReload: make(chan map[string]*conf.PathConf), chConfReload: make(chan map[string]*conf.PathConf),
pathClose: make(chan *path), chPathClose: make(chan *path),
pathSourceReady: make(chan *path), chPathSourceReady: make(chan *path),
pathSourceNotReady: make(chan *path), chPathSourceNotReady: make(chan *path),
describe: make(chan pathDescribeReq), chDescribe: make(chan pathDescribeReq),
readerSetupPlay: make(chan pathReaderSetupPlayReq), chReaderSetupPlay: make(chan pathReaderSetupPlayReq),
publisherAnnounce: make(chan pathPublisherAnnounceReq), chPublisherAnnounce: make(chan pathPublisherAnnounceReq),
hlsServerSet: make(chan pathManagerHLSServer), chHLSServerSet: make(chan pathManagerHLSServer),
apiPathsList: make(chan pathAPIPathsListReq), chAPIPathsList: make(chan pathAPIPathsListReq),
} }
for pathConfName, pathConf := range pm.pathConfs { for pathConfName, pathConf := range pm.pathConfs {
@@ -90,7 +90,7 @@ func newPathManager(
} }
if pm.metrics != nil { if pm.metrics != nil {
pm.metrics.onPathManagerSet(pm) pm.metrics.pathManagerSet(pm)
} }
pm.log(logger.Debug, "path manager created") pm.log(logger.Debug, "path manager created")
@@ -118,7 +118,7 @@ func (pm *pathManager) run() {
outer: outer:
for { for {
select { select {
case pathConfs := <-pm.confReload: case pathConfs := <-pm.chConfReload:
// remove confs // remove confs
for pathConfName := range pm.pathConfs { for pathConfName := range pm.pathConfs {
if _, ok := pathConfs[pathConfName]; !ok { if _, ok := pathConfs[pathConfName]; !ok {
@@ -156,24 +156,24 @@ outer:
} }
} }
case pa := <-pm.pathClose: case pa := <-pm.chPathClose:
if pmpa, ok := pm.paths[pa.Name()]; !ok || pmpa != pa { if pmpa, ok := pm.paths[pa.Name()]; !ok || pmpa != pa {
continue continue
} }
delete(pm.paths, pa.Name()) delete(pm.paths, pa.Name())
pa.close() pa.close()
case pa := <-pm.pathSourceReady: case pa := <-pm.chPathSourceReady:
if pm.hlsServer != nil { if pm.hlsServer != nil {
pm.hlsServer.onPathSourceReady(pa) pm.hlsServer.pathSourceReady(pa)
} }
case pa := <-pm.pathSourceNotReady: case pa := <-pm.chPathSourceNotReady:
if pm.hlsServer != nil { if pm.hlsServer != nil {
pm.hlsServer.onPathSourceNotReady(pa) pm.hlsServer.pathSourceNotReady(pa)
} }
case req := <-pm.describe: case req := <-pm.chDescribe:
pathConfName, pathConf, pathMatches, err := pm.findPathConf(req.pathName) pathConfName, pathConf, pathMatches, err := pm.findPathConf(req.pathName)
if err != nil { if err != nil {
req.res <- pathDescribeRes{err: err} req.res <- pathDescribeRes{err: err}
@@ -196,7 +196,7 @@ outer:
req.res <- pathDescribeRes{path: pm.paths[req.pathName]} req.res <- pathDescribeRes{path: pm.paths[req.pathName]}
case req := <-pm.readerSetupPlay: case req := <-pm.chReaderSetupPlay:
pathConfName, pathConf, pathMatches, err := pm.findPathConf(req.pathName) pathConfName, pathConf, pathMatches, err := pm.findPathConf(req.pathName)
if err != nil { if err != nil {
req.res <- pathReaderSetupPlayRes{err: err} req.res <- pathReaderSetupPlayRes{err: err}
@@ -221,7 +221,7 @@ outer:
req.res <- pathReaderSetupPlayRes{path: pm.paths[req.pathName]} req.res <- pathReaderSetupPlayRes{path: pm.paths[req.pathName]}
case req := <-pm.publisherAnnounce: case req := <-pm.chPublisherAnnounce:
pathConfName, pathConf, pathMatches, err := pm.findPathConf(req.pathName) pathConfName, pathConf, pathMatches, err := pm.findPathConf(req.pathName)
if err != nil { if err != nil {
req.res <- pathPublisherAnnounceRes{err: err} req.res <- pathPublisherAnnounceRes{err: err}
@@ -244,10 +244,10 @@ outer:
req.res <- pathPublisherAnnounceRes{path: pm.paths[req.pathName]} req.res <- pathPublisherAnnounceRes{path: pm.paths[req.pathName]}
case s := <-pm.hlsServerSet: case s := <-pm.chHLSServerSet:
pm.hlsServer = s pm.hlsServer = s
case req := <-pm.apiPathsList: case req := <-pm.chAPIPathsList:
paths := make(map[string]*path) paths := make(map[string]*path)
for name, pa := range pm.paths { for name, pa := range pm.paths {
@@ -266,7 +266,7 @@ outer:
pm.ctxCancel() pm.ctxCancel()
if pm.metrics != nil { if pm.metrics != nil {
pm.metrics.onPathManagerSet(nil) pm.metrics.pathManagerSet(nil)
} }
} }
@@ -315,26 +315,26 @@ func (pm *pathManager) findPathConf(name string) (string, *conf.PathConf, []stri
return "", nil, nil, fmt.Errorf("path '%s' is not configured", name) return "", nil, nil, fmt.Errorf("path '%s' is not configured", name)
} }
// onConfReload is called by core. // confReload is called by core.
func (pm *pathManager) onConfReload(pathConfs map[string]*conf.PathConf) { func (pm *pathManager) confReload(pathConfs map[string]*conf.PathConf) {
select { select {
case pm.confReload <- pathConfs: case pm.chConfReload <- pathConfs:
case <-pm.ctx.Done(): case <-pm.ctx.Done():
} }
} }
// onPathSourceReady is called by path. // pathSourceReady is called by path.
func (pm *pathManager) onPathSourceReady(pa *path) { func (pm *pathManager) pathSourceReady(pa *path) {
select { select {
case pm.pathSourceReady <- pa: case pm.chPathSourceReady <- pa:
case <-pm.ctx.Done(): case <-pm.ctx.Done():
} }
} }
// onPathSourceNotReady is called by path. // pathSourceNotReady is called by path.
func (pm *pathManager) onPathSourceNotReady(pa *path) { func (pm *pathManager) pathSourceNotReady(pa *path) {
select { select {
case pm.pathSourceNotReady <- pa: case pm.chPathSourceNotReady <- pa:
case <-pm.ctx.Done(): case <-pm.ctx.Done():
} }
} }
@@ -342,75 +342,75 @@ func (pm *pathManager) onPathSourceNotReady(pa *path) {
// onPathClose is called by path. // onPathClose is called by path.
func (pm *pathManager) onPathClose(pa *path) { func (pm *pathManager) onPathClose(pa *path) {
select { select {
case pm.pathClose <- pa: case pm.chPathClose <- pa:
case <-pm.ctx.Done(): case <-pm.ctx.Done():
} }
} }
// onDescribe is called by a reader or publisher. // describe is called by a reader or publisher.
func (pm *pathManager) onDescribe(req pathDescribeReq) pathDescribeRes { func (pm *pathManager) describe(req pathDescribeReq) pathDescribeRes {
req.res = make(chan pathDescribeRes) req.res = make(chan pathDescribeRes)
select { select {
case pm.describe <- req: case pm.chDescribe <- req:
res := <-req.res res := <-req.res
if res.err != nil { if res.err != nil {
return res return res
} }
return res.path.onDescribe(req) return res.path.describe(req)
case <-pm.ctx.Done(): case <-pm.ctx.Done():
return pathDescribeRes{err: fmt.Errorf("terminated")} return pathDescribeRes{err: fmt.Errorf("terminated")}
} }
} }
// onPublisherAnnounce is called by a publisher. // publisherAnnounce is called by a publisher.
func (pm *pathManager) onPublisherAnnounce(req pathPublisherAnnounceReq) pathPublisherAnnounceRes { func (pm *pathManager) publisherAnnounce(req pathPublisherAnnounceReq) pathPublisherAnnounceRes {
req.res = make(chan pathPublisherAnnounceRes) req.res = make(chan pathPublisherAnnounceRes)
select { select {
case pm.publisherAnnounce <- req: case pm.chPublisherAnnounce <- req:
res := <-req.res res := <-req.res
if res.err != nil { if res.err != nil {
return res return res
} }
return res.path.onPublisherAnnounce(req) return res.path.publisherAnnounce(req)
case <-pm.ctx.Done(): case <-pm.ctx.Done():
return pathPublisherAnnounceRes{err: fmt.Errorf("terminated")} return pathPublisherAnnounceRes{err: fmt.Errorf("terminated")}
} }
} }
// onReaderSetupPlay is called by a reader. // readerSetupPlay is called by a reader.
func (pm *pathManager) onReaderSetupPlay(req pathReaderSetupPlayReq) pathReaderSetupPlayRes { func (pm *pathManager) readerSetupPlay(req pathReaderSetupPlayReq) pathReaderSetupPlayRes {
req.res = make(chan pathReaderSetupPlayRes) req.res = make(chan pathReaderSetupPlayRes)
select { select {
case pm.readerSetupPlay <- req: case pm.chReaderSetupPlay <- req:
res := <-req.res res := <-req.res
if res.err != nil { if res.err != nil {
return res return res
} }
return res.path.onReaderSetupPlay(req) return res.path.readerSetupPlay(req)
case <-pm.ctx.Done(): case <-pm.ctx.Done():
return pathReaderSetupPlayRes{err: fmt.Errorf("terminated")} return pathReaderSetupPlayRes{err: fmt.Errorf("terminated")}
} }
} }
// onHLSServerSet is called by hlsServer. // hlsServerSet is called by hlsServer.
func (pm *pathManager) onHLSServerSet(s pathManagerHLSServer) { func (pm *pathManager) hlsServerSet(s pathManagerHLSServer) {
select { select {
case pm.hlsServerSet <- s: case pm.chHLSServerSet <- s:
case <-pm.ctx.Done(): case <-pm.ctx.Done():
} }
} }
// onAPIPathsList is called by api. // apiPathsList is called by api.
func (pm *pathManager) onAPIPathsList(req pathAPIPathsListReq) pathAPIPathsListRes { func (pm *pathManager) apiPathsList(req pathAPIPathsListReq) pathAPIPathsListRes {
req.res = make(chan pathAPIPathsListRes) req.res = make(chan pathAPIPathsListRes)
select { select {
case pm.apiPathsList <- req: case pm.chAPIPathsList <- req:
res := <-req.res res := <-req.res
res.data = &pathAPIPathsListData{ res.data = &pathAPIPathsListData{
@@ -418,7 +418,7 @@ func (pm *pathManager) onAPIPathsList(req pathAPIPathsListReq) pathAPIPathsListR
} }
for _, pa := range res.paths { for _, pa := range res.paths {
pa.onAPIPathsList(pathAPIPathsListSubReq{data: res.data}) pa.apiPathsList(pathAPIPathsListSubReq{data: res.data})
} }
return res return res

View File

@@ -5,5 +5,5 @@ type reader interface {
close() close()
onReaderAccepted() onReaderAccepted()
onReaderData(*data) onReaderData(*data)
onReaderAPIDescribe() interface{} apiReaderDescribe() interface{}
} }

View File

@@ -47,13 +47,13 @@ const (
) )
type rtmpConnPathManager interface { type rtmpConnPathManager interface {
onReaderSetupPlay(req pathReaderSetupPlayReq) pathReaderSetupPlayRes readerSetupPlay(req pathReaderSetupPlayReq) pathReaderSetupPlayRes
onPublisherAnnounce(req pathPublisherAnnounceReq) pathPublisherAnnounceRes publisherAnnounce(req pathPublisherAnnounceReq) pathPublisherAnnounceRes
} }
type rtmpConnParent interface { type rtmpConnParent interface {
log(logger.Level, string, ...interface{}) log(logger.Level, string, ...interface{})
onConnClose(*rtmpConn) connClose(*rtmpConn)
} }
type rtmpConn struct { type rtmpConn struct {
@@ -199,7 +199,7 @@ func (c *rtmpConn) run() {
c.ctxCancel() c.ctxCancel()
c.parent.onConnClose(c) c.parent.connClose(c)
c.log(logger.Info, "closed (%v)", err) c.log(logger.Info, "closed (%v)", err)
} }
@@ -226,7 +226,7 @@ func (c *rtmpConn) runInner(ctx context.Context) error {
func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error { func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error {
pathName, query, rawQuery := pathNameAndQuery(u) pathName, query, rawQuery := pathNameAndQuery(u)
res := c.pathManager.onReaderSetupPlay(pathReaderSetupPlayReq{ res := c.pathManager.readerSetupPlay(pathReaderSetupPlayReq{
author: c, author: c,
pathName: pathName, pathName: pathName,
authenticate: func( authenticate: func(
@@ -250,7 +250,7 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error {
c.path = res.path c.path = res.path
defer func() { defer func() {
c.path.onReaderRemove(pathReaderRemoveReq{author: c}) c.path.readerRemove(pathReaderRemoveReq{author: c})
}() }()
c.stateMutex.Lock() c.stateMutex.Lock()
@@ -307,7 +307,7 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error {
c.ringBuffer.Close() c.ringBuffer.Close()
}() }()
c.path.onReaderPlay(pathReaderPlayReq{ c.path.readerPlay(pathReaderPlayReq{
author: c, author: c,
}) })
@@ -519,7 +519,7 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error {
pathName, query, rawQuery := pathNameAndQuery(u) pathName, query, rawQuery := pathNameAndQuery(u)
res := c.pathManager.onPublisherAnnounce(pathPublisherAnnounceReq{ res := c.pathManager.publisherAnnounce(pathPublisherAnnounceReq{
author: c, author: c,
pathName: pathName, pathName: pathName,
authenticate: func( authenticate: func(
@@ -543,7 +543,7 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error {
c.path = res.path c.path = res.path
defer func() { defer func() {
c.path.onPublisherRemove(pathPublisherRemoveReq{author: c}) c.path.publisherRemove(pathPublisherRemoveReq{author: c})
}() }()
c.stateMutex.Lock() c.stateMutex.Lock()
@@ -553,7 +553,7 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error {
// disable write deadline // disable write deadline
c.nconn.SetWriteDeadline(time.Time{}) c.nconn.SetWriteDeadline(time.Time{})
rres := c.path.onPublisherRecord(pathPublisherRecordReq{ rres := c.path.publisherRecord(pathPublisherRecordReq{
author: c, author: c,
tracks: tracks, tracks: tracks,
}) })
@@ -742,16 +742,16 @@ func (c *rtmpConn) onReaderData(data *data) {
c.ringBuffer.Push(data) c.ringBuffer.Push(data)
} }
// onReaderAPIDescribe implements reader. // apiReaderDescribe implements reader.
func (c *rtmpConn) onReaderAPIDescribe() interface{} { func (c *rtmpConn) apiReaderDescribe() interface{} {
return struct { return struct {
Type string `json:"type"` Type string `json:"type"`
ID string `json:"id"` ID string `json:"id"`
}{"rtmpConn", c.id} }{"rtmpConn", c.id}
} }
// onSourceAPIDescribe implements source. // apiSourceDescribe implements source.
func (c *rtmpConn) onSourceAPIDescribe() interface{} { func (c *rtmpConn) apiSourceDescribe() interface{} {
return struct { return struct {
Type string `json:"type"` Type string `json:"type"`
ID string `json:"id"` ID string `json:"id"`

View File

@@ -64,9 +64,9 @@ type rtmpServer struct {
conns map[*rtmpConn]struct{} conns map[*rtmpConn]struct{}
// in // in
connClose chan *rtmpConn chConnClose chan *rtmpConn
apiConnsList chan rtmpServerAPIConnsListReq chAPIConnsList chan rtmpServerAPIConnsListReq
apiConnsKick chan rtmpServerAPIConnsKickReq chAPIConnsKick chan rtmpServerAPIConnsKickReq
} }
func newRTMPServer( func newRTMPServer(
@@ -107,15 +107,15 @@ func newRTMPServer(
ctxCancel: ctxCancel, ctxCancel: ctxCancel,
l: l, l: l,
conns: make(map[*rtmpConn]struct{}), conns: make(map[*rtmpConn]struct{}),
connClose: make(chan *rtmpConn), chConnClose: make(chan *rtmpConn),
apiConnsList: make(chan rtmpServerAPIConnsListReq), chAPIConnsList: make(chan rtmpServerAPIConnsListReq),
apiConnsKick: make(chan rtmpServerAPIConnsKickReq), chAPIConnsKick: make(chan rtmpServerAPIConnsKickReq),
} }
s.log(logger.Info, "listener opened on %s", address) s.log(logger.Info, "listener opened on %s", address)
if s.metrics != nil { if s.metrics != nil {
s.metrics.onRTMPServerSet(s) s.metrics.rtmpServerSet(s)
} }
s.wg.Add(1) s.wg.Add(1)
@@ -190,13 +190,13 @@ outer:
s) s)
s.conns[c] = struct{}{} s.conns[c] = struct{}{}
case c := <-s.connClose: case c := <-s.chConnClose:
if _, ok := s.conns[c]; !ok { if _, ok := s.conns[c]; !ok {
continue continue
} }
delete(s.conns, c) delete(s.conns, c)
case req := <-s.apiConnsList: case req := <-s.chAPIConnsList:
data := &rtmpServerAPIConnsListData{ data := &rtmpServerAPIConnsListData{
Items: make(map[string]rtmpServerAPIConnsListItem), Items: make(map[string]rtmpServerAPIConnsListItem),
} }
@@ -219,7 +219,7 @@ outer:
req.res <- rtmpServerAPIConnsListRes{data: data} req.res <- rtmpServerAPIConnsListRes{data: data}
case req := <-s.apiConnsKick: case req := <-s.chAPIConnsKick:
res := func() bool { res := func() bool {
for c := range s.conns { for c := range s.conns {
if c.ID() == req.id { if c.ID() == req.id {
@@ -246,7 +246,7 @@ outer:
s.l.Close() s.l.Close()
if s.metrics != nil { if s.metrics != nil {
s.metrics.onRTMPServerSet(s) s.metrics.rtmpServerSet(s)
} }
} }
@@ -278,19 +278,19 @@ func (s *rtmpServer) newConnID() (string, error) {
} }
} }
// onConnClose is called by rtmpConn. // connClose is called by rtmpConn.
func (s *rtmpServer) onConnClose(c *rtmpConn) { func (s *rtmpServer) connClose(c *rtmpConn) {
select { select {
case s.connClose <- c: case s.chConnClose <- c:
case <-s.ctx.Done(): case <-s.ctx.Done():
} }
} }
// onAPIConnsList is called by api. // apiConnsList is called by api.
func (s *rtmpServer) onAPIConnsList(req rtmpServerAPIConnsListReq) rtmpServerAPIConnsListRes { func (s *rtmpServer) apiConnsList(req rtmpServerAPIConnsListReq) rtmpServerAPIConnsListRes {
req.res = make(chan rtmpServerAPIConnsListRes) req.res = make(chan rtmpServerAPIConnsListRes)
select { select {
case s.apiConnsList <- req: case s.chAPIConnsList <- req:
return <-req.res return <-req.res
case <-s.ctx.Done(): case <-s.ctx.Done():
@@ -298,11 +298,11 @@ func (s *rtmpServer) onAPIConnsList(req rtmpServerAPIConnsListReq) rtmpServerAPI
} }
} }
// onAPIConnsKick is called by api. // apiConnsKick is called by api.
func (s *rtmpServer) onAPIConnsKick(req rtmpServerAPIConnsKickReq) rtmpServerAPIConnsKickRes { func (s *rtmpServer) apiConnsKick(req rtmpServerAPIConnsKickReq) rtmpServerAPIConnsKickRes {
req.res = make(chan rtmpServerAPIConnsKickRes) req.res = make(chan rtmpServerAPIConnsKickRes)
select { select {
case s.apiConnsKick <- req: case s.chAPIConnsKick <- req:
return <-req.res return <-req.res
case <-s.ctx.Done(): case <-s.ctx.Done():

View File

@@ -21,8 +21,8 @@ import (
type rtmpSourceParent interface { type rtmpSourceParent interface {
log(logger.Level, string, ...interface{}) log(logger.Level, string, ...interface{})
onSourceStaticImplSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes sourceStaticImplSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes
onSourceStaticImplSetNotReady(req pathSourceStaticSetNotReadyReq) sourceStaticImplSetNotReady(req pathSourceStaticSetNotReadyReq)
} }
type rtmpSource struct { type rtmpSource struct {
@@ -119,7 +119,7 @@ func (s *rtmpSource) run(ctx context.Context) error {
tracks = append(tracks, audioTrack) tracks = append(tracks, audioTrack)
} }
res := s.parent.onSourceStaticImplSetReady(pathSourceStaticSetReadyReq{tracks: tracks}) res := s.parent.sourceStaticImplSetReady(pathSourceStaticSetReadyReq{tracks: tracks})
if res.err != nil { if res.err != nil {
return res.err return res.err
} }
@@ -127,7 +127,7 @@ func (s *rtmpSource) run(ctx context.Context) error {
s.Log(logger.Info, "ready") s.Log(logger.Info, "ready")
defer func() { defer func() {
s.parent.onSourceStaticImplSetNotReady(pathSourceStaticSetNotReadyReq{}) s.parent.sourceStaticImplSetNotReady(pathSourceStaticSetNotReadyReq{})
}() }()
for { for {
@@ -212,8 +212,8 @@ func (s *rtmpSource) run(ctx context.Context) error {
} }
} }
// onSourceAPIDescribe implements sourceStaticImpl. // apiSourceDescribe implements sourceStaticImpl.
func (*rtmpSource) onSourceAPIDescribe() interface{} { func (*rtmpSource) apiSourceDescribe() interface{} {
return struct { return struct {
Type string `json:"type"` Type string `json:"type"`
}{"rtmpSource"} }{"rtmpSource"}

View File

@@ -243,7 +243,7 @@ func (c *rtspConn) OnResponse(res *base.Response) {
// onDescribe is called by rtspServer. // onDescribe is called by rtspServer.
func (c *rtspConn) onDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx, func (c *rtspConn) onDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx,
) (*base.Response, *gortsplib.ServerStream, error) { ) (*base.Response, *gortsplib.ServerStream, error) {
res := c.pathManager.onDescribe(pathDescribeReq{ res := c.pathManager.describe(pathDescribeReq{
pathName: ctx.Path, pathName: ctx.Path,
url: ctx.Request.URL, url: ctx.Request.URL,
authenticate: func( authenticate: func(

View File

@@ -167,9 +167,9 @@ func newRTSPServer(
if s.metrics != nil { if s.metrics != nil {
if !isTLS { if !isTLS {
s.metrics.onRTSPServerSet(s) s.metrics.rtspServerSet(s)
} else { } else {
s.metrics.onRTSPSServerSet(s) s.metrics.rtspsServerSet(s)
} }
} }
@@ -219,9 +219,9 @@ outer:
if s.metrics != nil { if s.metrics != nil {
if !s.isTLS { if !s.isTLS {
s.metrics.onRTSPServerSet(nil) s.metrics.rtspServerSet(nil)
} else { } else {
s.metrics.onRTSPSServerSet(nil) s.metrics.rtspsServerSet(nil)
} }
} }
} }
@@ -392,8 +392,8 @@ func (s *rtspServer) OnPacketRTP(ctx *gortsplib.ServerHandlerOnPacketRTPCtx) {
se.onPacketRTP(ctx) se.onPacketRTP(ctx)
} }
// onAPISessionsList is called by api and metrics. // apiSessionsList is called by api and metrics.
func (s *rtspServer) onAPISessionsList(req rtspServerAPISessionsListReq) rtspServerAPISessionsListRes { func (s *rtspServer) apiSessionsList(req rtspServerAPISessionsListReq) rtspServerAPISessionsListRes {
select { select {
case <-s.ctx.Done(): case <-s.ctx.Done():
return rtspServerAPISessionsListRes{err: fmt.Errorf("terminated")} return rtspServerAPISessionsListRes{err: fmt.Errorf("terminated")}
@@ -428,8 +428,8 @@ func (s *rtspServer) onAPISessionsList(req rtspServerAPISessionsListReq) rtspSer
return rtspServerAPISessionsListRes{data: data} return rtspServerAPISessionsListRes{data: data}
} }
// onAPISessionsKick is called by api. // apiSessionsKick is called by api.
func (s *rtspServer) onAPISessionsKick(req rtspServerAPISessionsKickReq) rtspServerAPISessionsKickRes { func (s *rtspServer) apiSessionsKick(req rtspServerAPISessionsKickReq) rtspServerAPISessionsKickRes {
select { select {
case <-s.ctx.Done(): case <-s.ctx.Done():
return rtspServerAPISessionsKickRes{err: fmt.Errorf("terminated")} return rtspServerAPISessionsKickRes{err: fmt.Errorf("terminated")}

View File

@@ -20,8 +20,8 @@ const (
) )
type rtspSessionPathManager interface { type rtspSessionPathManager interface {
onPublisherAnnounce(req pathPublisherAnnounceReq) pathPublisherAnnounceRes publisherAnnounce(req pathPublisherAnnounceReq) pathPublisherAnnounceRes
onReaderSetupPlay(req pathReaderSetupPlayReq) pathReaderSetupPlayRes readerSetupPlay(req pathReaderSetupPlayReq) pathReaderSetupPlayRes
} }
type rtspSessionParent interface { type rtspSessionParent interface {
@@ -112,11 +112,11 @@ func (s *rtspSession) onClose(err error) {
switch s.ss.State() { switch s.ss.State() {
case gortsplib.ServerSessionStatePrePlay, gortsplib.ServerSessionStatePlay: case gortsplib.ServerSessionStatePrePlay, gortsplib.ServerSessionStatePlay:
s.path.onReaderRemove(pathReaderRemoveReq{author: s}) s.path.readerRemove(pathReaderRemoveReq{author: s})
s.path = nil s.path = nil
case gortsplib.ServerSessionStatePreRecord, gortsplib.ServerSessionStateRecord: case gortsplib.ServerSessionStatePreRecord, gortsplib.ServerSessionStateRecord:
s.path.onPublisherRemove(pathPublisherRemoveReq{author: s}) s.path.publisherRemove(pathPublisherRemoveReq{author: s})
s.path = nil s.path = nil
} }
@@ -125,7 +125,7 @@ func (s *rtspSession) onClose(err error) {
// onAnnounce is called by rtspServer. // onAnnounce is called by rtspServer.
func (s *rtspSession) onAnnounce(c *rtspConn, ctx *gortsplib.ServerHandlerOnAnnounceCtx) (*base.Response, error) { func (s *rtspSession) onAnnounce(c *rtspConn, ctx *gortsplib.ServerHandlerOnAnnounceCtx) (*base.Response, error) {
res := s.pathManager.onPublisherAnnounce(pathPublisherAnnounceReq{ res := s.pathManager.publisherAnnounce(pathPublisherAnnounceReq{
author: s, author: s,
pathName: ctx.Path, pathName: ctx.Path,
authenticate: func( authenticate: func(
@@ -185,7 +185,7 @@ func (s *rtspSession) onSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCt
switch s.ss.State() { switch s.ss.State() {
case gortsplib.ServerSessionStateInitial, gortsplib.ServerSessionStatePrePlay: // play case gortsplib.ServerSessionStateInitial, gortsplib.ServerSessionStatePrePlay: // play
res := s.pathManager.onReaderSetupPlay(pathReaderSetupPlayReq{ res := s.pathManager.readerSetupPlay(pathReaderSetupPlayReq{
author: s, author: s,
pathName: ctx.Path, pathName: ctx.Path,
authenticate: func( authenticate: func(
@@ -249,7 +249,7 @@ func (s *rtspSession) onPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Respo
h := make(base.Header) h := make(base.Header)
if s.ss.State() == gortsplib.ServerSessionStatePrePlay { if s.ss.State() == gortsplib.ServerSessionStatePrePlay {
s.path.onReaderPlay(pathReaderPlayReq{author: s}) s.path.readerPlay(pathReaderPlayReq{author: s})
if s.path.Conf().RunOnRead != "" { if s.path.Conf().RunOnRead != "" {
s.log(logger.Info, "runOnRead command started") s.log(logger.Info, "runOnRead command started")
@@ -276,7 +276,7 @@ func (s *rtspSession) onPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Respo
// onRecord is called by rtspServer. // onRecord is called by rtspServer.
func (s *rtspSession) onRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.Response, error) { func (s *rtspSession) onRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.Response, error) {
res := s.path.onPublisherRecord(pathPublisherRecordReq{ res := s.path.publisherRecord(pathPublisherRecordReq{
author: s, author: s,
tracks: s.announcedTracks, tracks: s.announcedTracks,
}) })
@@ -306,14 +306,14 @@ func (s *rtspSession) onPause(ctx *gortsplib.ServerHandlerOnPauseCtx) (*base.Res
s.onReadCmd.Close() s.onReadCmd.Close()
} }
s.path.onReaderPause(pathReaderPauseReq{author: s}) s.path.readerPause(pathReaderPauseReq{author: s})
s.stateMutex.Lock() s.stateMutex.Lock()
s.state = gortsplib.ServerSessionStatePrePlay s.state = gortsplib.ServerSessionStatePrePlay
s.stateMutex.Unlock() s.stateMutex.Unlock()
case gortsplib.ServerSessionStateRecord: case gortsplib.ServerSessionStateRecord:
s.path.onPublisherPause(pathPublisherPauseReq{author: s}) s.path.publisherPause(pathPublisherPauseReq{author: s})
s.stateMutex.Lock() s.stateMutex.Lock()
s.state = gortsplib.ServerSessionStatePreRecord s.state = gortsplib.ServerSessionStatePreRecord
@@ -346,8 +346,8 @@ func (s *rtspSession) onReaderData(data *data) {
// packets are routed to the session by gortsplib.ServerStream. // packets are routed to the session by gortsplib.ServerStream.
} }
// onReaderAPIDescribe implements reader. // apiReaderDescribe implements reader.
func (s *rtspSession) onReaderAPIDescribe() interface{} { func (s *rtspSession) apiReaderDescribe() interface{} {
var typ string var typ string
if s.isTLS { if s.isTLS {
typ = "rtspsSession" typ = "rtspsSession"
@@ -361,8 +361,8 @@ func (s *rtspSession) onReaderAPIDescribe() interface{} {
}{typ, s.id} }{typ, s.id}
} }
// onSourceAPIDescribe implements source. // apiSourceDescribe implements source.
func (s *rtspSession) onSourceAPIDescribe() interface{} { func (s *rtspSession) apiSourceDescribe() interface{} {
var typ string var typ string
if s.isTLS { if s.isTLS {
typ = "rtspsSession" typ = "rtspsSession"

View File

@@ -19,8 +19,8 @@ import (
type rtspSourceParent interface { type rtspSourceParent interface {
log(logger.Level, string, ...interface{}) log(logger.Level, string, ...interface{})
onSourceStaticImplSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes sourceStaticImplSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes
onSourceStaticImplSetNotReady(req pathSourceStaticSetNotReadyReq) sourceStaticImplSetNotReady(req pathSourceStaticSetNotReadyReq)
} }
type rtspSource struct { type rtspSource struct {
@@ -125,7 +125,7 @@ func (s *rtspSource) run(ctx context.Context) error {
} }
} }
res := s.parent.onSourceStaticImplSetReady(pathSourceStaticSetReadyReq{tracks: tracks}) res := s.parent.sourceStaticImplSetReady(pathSourceStaticSetReadyReq{tracks: tracks})
if res.err != nil { if res.err != nil {
return res.err return res.err
} }
@@ -133,7 +133,7 @@ func (s *rtspSource) run(ctx context.Context) error {
s.Log(logger.Info, "ready") s.Log(logger.Info, "ready")
defer func() { defer func() {
s.parent.onSourceStaticImplSetNotReady(pathSourceStaticSetNotReadyReq{}) s.parent.sourceStaticImplSetNotReady(pathSourceStaticSetNotReadyReq{})
}() }()
c.OnPacketRTP = func(ctx *gortsplib.ClientOnPacketRTPCtx) { c.OnPacketRTP = func(ctx *gortsplib.ClientOnPacketRTPCtx) {
@@ -174,8 +174,8 @@ func (s *rtspSource) run(ctx context.Context) error {
} }
} }
// onSourceAPIDescribe implements sourceStaticImpl. // apiSourceDescribe implements sourceStaticImpl.
func (*rtspSource) onSourceAPIDescribe() interface{} { func (*rtspSource) apiSourceDescribe() interface{} {
return struct { return struct {
Type string `json:"type"` Type string `json:"type"`
}{"rtspSource"} }{"rtspSource"}

View File

@@ -6,5 +6,5 @@ package core
// - sourceStatic // - sourceStatic
// - sourceRedirect // - sourceRedirect
type source interface { type source interface {
onSourceAPIDescribe() interface{} apiSourceDescribe() interface{}
} }

View File

@@ -3,8 +3,8 @@ package core
// sourceRedirect is a source that redirects to another one. // sourceRedirect is a source that redirects to another one.
type sourceRedirect struct{} type sourceRedirect struct{}
// onSourceAPIDescribe implements source. // apiSourceDescribe implements source.
func (*sourceRedirect) onSourceAPIDescribe() interface{} { func (*sourceRedirect) apiSourceDescribe() interface{} {
return struct { return struct {
Type string `json:"type"` Type string `json:"type"`
}{"redirect"} }{"redirect"}

View File

@@ -17,13 +17,13 @@ const (
type sourceStaticImpl interface { type sourceStaticImpl interface {
Log(logger.Level, string, ...interface{}) Log(logger.Level, string, ...interface{})
run(context.Context) error run(context.Context) error
onSourceAPIDescribe() interface{} apiSourceDescribe() interface{}
} }
type sourceStaticParent interface { type sourceStaticParent interface {
log(logger.Level, string, ...interface{}) log(logger.Level, string, ...interface{})
onSourceStaticSetReady(context.Context, pathSourceStaticSetReadyReq) sourceStaticSetReady(context.Context, pathSourceStaticSetReadyReq)
onSourceStaticSetNotReady(context.Context, pathSourceStaticSetNotReadyReq) sourceStaticSetNotReady(context.Context, pathSourceStaticSetNotReadyReq)
} }
// sourceStatic is a static source. // sourceStatic is a static source.
@@ -42,9 +42,9 @@ type sourceStatic struct {
impl sourceStaticImpl impl sourceStaticImpl
running bool running bool
done chan struct{} done chan struct{}
sourceStaticImplSetReady chan pathSourceStaticSetReadyReq chSourceStaticImplSetReady chan pathSourceStaticSetReadyReq
sourceStaticImplSetNotReady chan pathSourceStaticSetNotReadyReq chSourceStaticImplSetNotReady chan pathSourceStaticSetNotReadyReq
} }
func newSourceStatic( func newSourceStatic(
@@ -58,16 +58,16 @@ func newSourceStatic(
parent sourceStaticParent, parent sourceStaticParent,
) *sourceStatic { ) *sourceStatic {
s := &sourceStatic{ s := &sourceStatic{
ur: ur, ur: ur,
protocol: protocol, protocol: protocol,
anyPortEnable: anyPortEnable, anyPortEnable: anyPortEnable,
fingerprint: fingerprint, fingerprint: fingerprint,
readTimeout: readTimeout, readTimeout: readTimeout,
writeTimeout: writeTimeout, writeTimeout: writeTimeout,
readBufferCount: readBufferCount, readBufferCount: readBufferCount,
parent: parent, parent: parent,
sourceStaticImplSetReady: make(chan pathSourceStaticSetReadyReq), chSourceStaticImplSetReady: make(chan pathSourceStaticSetReadyReq),
sourceStaticImplSetNotReady: make(chan pathSourceStaticSetNotReadyReq), chSourceStaticImplSetNotReady: make(chan pathSourceStaticSetNotReadyReq),
} }
switch { switch {
@@ -170,11 +170,11 @@ func (s *sourceStatic) runInner() {
s.impl.Log(logger.Info, "ERR: %v", err) s.impl.Log(logger.Info, "ERR: %v", err)
return return
case req := <-s.sourceStaticImplSetReady: case req := <-s.chSourceStaticImplSetReady:
s.parent.onSourceStaticSetReady(s.ctx, req) s.parent.sourceStaticSetReady(s.ctx, req)
case req := <-s.sourceStaticImplSetNotReady: case req := <-s.chSourceStaticImplSetNotReady:
s.parent.onSourceStaticSetNotReady(s.ctx, req) s.parent.sourceStaticSetNotReady(s.ctx, req)
case <-s.ctx.Done(): case <-s.ctx.Done():
innerCtxCancel() innerCtxCancel()
@@ -184,27 +184,27 @@ func (s *sourceStatic) runInner() {
} }
} }
// onSourceAPIDescribe implements source. // apiSourceDescribe implements source.
func (s *sourceStatic) onSourceAPIDescribe() interface{} { func (s *sourceStatic) apiSourceDescribe() interface{} {
return s.impl.onSourceAPIDescribe() return s.impl.apiSourceDescribe()
} }
// onSourceStaticImplSetReady is called by a sourceStaticImpl. // sourceStaticImplSetReady is called by a sourceStaticImpl.
func (s *sourceStatic) onSourceStaticImplSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes { func (s *sourceStatic) sourceStaticImplSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes {
req.res = make(chan pathSourceStaticSetReadyRes) req.res = make(chan pathSourceStaticSetReadyRes)
select { select {
case s.sourceStaticImplSetReady <- req: case s.chSourceStaticImplSetReady <- req:
return <-req.res return <-req.res
case <-s.ctx.Done(): case <-s.ctx.Done():
return pathSourceStaticSetReadyRes{err: fmt.Errorf("terminated")} return pathSourceStaticSetReadyRes{err: fmt.Errorf("terminated")}
} }
} }
// onSourceStaticImplSetNotReady is called by a sourceStaticImpl. // sourceStaticImplSetNotReady is called by a sourceStaticImpl.
func (s *sourceStatic) onSourceStaticImplSetNotReady(req pathSourceStaticSetNotReadyReq) { func (s *sourceStatic) sourceStaticImplSetNotReady(req pathSourceStaticSetNotReadyReq) {
req.res = make(chan struct{}) req.res = make(chan struct{})
select { select {
case s.sourceStaticImplSetNotReady <- req: case s.chSourceStaticImplSetNotReady <- req:
<-req.res <-req.res
case <-s.ctx.Done(): case <-s.ctx.Done():
} }