unexport members of private structs

This commit is contained in:
aler9
2021-10-27 21:01:00 +02:00
parent 07db2ce0ef
commit ab70f946b0
19 changed files with 287 additions and 287 deletions

View File

@@ -244,22 +244,22 @@ type apiRTMPConnsKickReq struct {
} }
type apiPathManager interface { type apiPathManager interface {
OnAPIPathsList(req apiPathsListReq1) apiPathsListRes1 onAPIPathsList(req apiPathsListReq1) apiPathsListRes1
} }
type apiRTSPServer interface { type apiRTSPServer interface {
OnAPIRTSPSessionsList(req apiRTSPSessionsListReq) apiRTSPSessionsListRes onAPIRTSPSessionsList(req apiRTSPSessionsListReq) apiRTSPSessionsListRes
OnAPIRTSPSessionsKick(req apiRTSPSessionsKickReq) apiRTSPSessionsKickRes onAPIRTSPSessionsKick(req apiRTSPSessionsKickReq) apiRTSPSessionsKickRes
} }
type apiRTMPServer interface { type apiRTMPServer interface {
OnAPIRTMPConnsList(req apiRTMPConnsListReq) apiRTMPConnsListRes onAPIRTMPConnsList(req apiRTMPConnsListReq) apiRTMPConnsListRes
OnAPIRTMPConnsKick(req apiRTMPConnsKickReq) apiRTMPConnsKickRes onAPIRTMPConnsKick(req apiRTMPConnsKickReq) apiRTMPConnsKickRes
} }
type apiParent interface { type apiParent interface {
Log(logger.Level, string, ...interface{}) Log(logger.Level, string, ...interface{})
OnAPIConfigSet(conf *conf.Conf) onAPIConfigSet(conf *conf.Conf)
} }
type api struct { type api struct {
@@ -380,7 +380,7 @@ func (a *api) onConfigSet(ctx *gin.Context) {
// since reloading the configuration can cause the shutdown of the API, // since reloading the configuration can cause the shutdown of the API,
// call it in a goroutine // call it in a goroutine
go a.parent.OnAPIConfigSet(&newConf) go a.parent.onAPIConfigSet(&newConf)
ctx.Status(http.StatusOK) ctx.Status(http.StatusOK)
} }
@@ -425,7 +425,7 @@ func (a *api) onConfigPathsAdd(ctx *gin.Context) {
// since reloading the configuration can cause the shutdown of the API, // since reloading the configuration can cause the shutdown of the API,
// call it in a goroutine // call it in a goroutine
go a.parent.OnAPIConfigSet(&newConf) go a.parent.onAPIConfigSet(&newConf)
ctx.Status(http.StatusOK) ctx.Status(http.StatusOK)
} }
@@ -468,7 +468,7 @@ func (a *api) onConfigPathsEdit(ctx *gin.Context) {
// since reloading the configuration can cause the shutdown of the API, // since reloading the configuration can cause the shutdown of the API,
// call it in a goroutine // call it in a goroutine
go a.parent.OnAPIConfigSet(&newConf) go a.parent.onAPIConfigSet(&newConf)
ctx.Status(http.StatusOK) ctx.Status(http.StatusOK)
} }
@@ -504,13 +504,13 @@ func (a *api) onConfigPathsDelete(ctx *gin.Context) {
// since reloading the configuration can cause the shutdown of the API, // since reloading the configuration can cause the shutdown of the API,
// call it in a goroutine // call it in a goroutine
go a.parent.OnAPIConfigSet(&newConf) go a.parent.onAPIConfigSet(&newConf)
ctx.Status(http.StatusOK) ctx.Status(http.StatusOK)
} }
func (a *api) onPathsList(ctx *gin.Context) { func (a *api) onPathsList(ctx *gin.Context) {
res := a.pathManager.OnAPIPathsList(apiPathsListReq1{}) res := a.pathManager.onAPIPathsList(apiPathsListReq1{})
if res.Err != nil { if res.Err != nil {
ctx.AbortWithStatus(http.StatusInternalServerError) ctx.AbortWithStatus(http.StatusInternalServerError)
return return
@@ -525,7 +525,7 @@ func (a *api) onRTSPSessionsList(ctx *gin.Context) {
return return
} }
res := a.rtspServer.OnAPIRTSPSessionsList(apiRTSPSessionsListReq{}) res := a.rtspServer.onAPIRTSPSessionsList(apiRTSPSessionsListReq{})
if res.Err != nil { if res.Err != nil {
ctx.AbortWithStatus(http.StatusInternalServerError) ctx.AbortWithStatus(http.StatusInternalServerError)
return return
@@ -542,7 +542,7 @@ func (a *api) onRTSPSessionsKick(ctx *gin.Context) {
id := ctx.Param("id") id := ctx.Param("id")
res := a.rtspServer.OnAPIRTSPSessionsKick(apiRTSPSessionsKickReq{ID: id}) res := a.rtspServer.onAPIRTSPSessionsKick(apiRTSPSessionsKickReq{ID: id})
if res.Err != nil { if res.Err != nil {
ctx.AbortWithStatus(http.StatusNotFound) ctx.AbortWithStatus(http.StatusNotFound)
return return
@@ -557,7 +557,7 @@ func (a *api) onRTSPSSessionsList(ctx *gin.Context) {
return return
} }
res := a.rtspsServer.OnAPIRTSPSessionsList(apiRTSPSessionsListReq{}) res := a.rtspsServer.onAPIRTSPSessionsList(apiRTSPSessionsListReq{})
if res.Err != nil { if res.Err != nil {
ctx.AbortWithStatus(http.StatusInternalServerError) ctx.AbortWithStatus(http.StatusInternalServerError)
return return
@@ -574,7 +574,7 @@ func (a *api) onRTSPSSessionsKick(ctx *gin.Context) {
id := ctx.Param("id") id := ctx.Param("id")
res := a.rtspsServer.OnAPIRTSPSessionsKick(apiRTSPSessionsKickReq{ID: id}) res := a.rtspsServer.onAPIRTSPSessionsKick(apiRTSPSessionsKickReq{ID: id})
if res.Err != nil { if res.Err != nil {
ctx.AbortWithStatus(http.StatusNotFound) ctx.AbortWithStatus(http.StatusNotFound)
return return
@@ -589,7 +589,7 @@ func (a *api) onRTMPConnsList(ctx *gin.Context) {
return return
} }
res := a.rtmpServer.OnAPIRTMPConnsList(apiRTMPConnsListReq{}) res := a.rtmpServer.onAPIRTMPConnsList(apiRTMPConnsListReq{})
if res.Err != nil { if res.Err != nil {
ctx.AbortWithStatus(http.StatusInternalServerError) ctx.AbortWithStatus(http.StatusInternalServerError)
return return
@@ -598,8 +598,8 @@ func (a *api) onRTMPConnsList(ctx *gin.Context) {
ctx.JSON(http.StatusOK, res.Data) ctx.JSON(http.StatusOK, res.Data)
} }
// OnConfReload is called by core. // onConfReload is called by core.
func (a *api) OnConfReload(conf *conf.Conf) { func (a *api) onConfReload(conf *conf.Conf) {
a.mutex.Lock() a.mutex.Lock()
defer a.mutex.Unlock() defer a.mutex.Unlock()
a.conf = conf a.conf = conf
@@ -613,7 +613,7 @@ func (a *api) onRTMPConnsKick(ctx *gin.Context) {
id := ctx.Param("id") id := ctx.Param("id")
res := a.rtmpServer.OnAPIRTMPConnsKick(apiRTMPConnsKickReq{ID: id}) res := a.rtmpServer.onAPIRTMPConnsKick(apiRTMPConnsKickReq{ID: id})
if res.Err != nil { if res.Err != nil {
ctx.AbortWithStatus(http.StatusNotFound) ctx.AbortWithStatus(http.StatusNotFound)
return return

View File

@@ -404,7 +404,7 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
closeMetrics { closeMetrics {
closePathManager = true closePathManager = true
} else if !reflect.DeepEqual(newConf.Paths, p.conf.Paths) { } else if !reflect.DeepEqual(newConf.Paths, p.conf.Paths) {
p.pathManager.OnConfReload(newConf.Paths) p.pathManager.onConfReload(newConf.Paths)
} }
closeRTSPServer := false closeRTSPServer := false
@@ -495,7 +495,7 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
p.api.close() p.api.close()
p.api = nil p.api = nil
} else if !calledByAPI { // avoid a loop } else if !calledByAPI { // avoid a loop
p.api.OnConfReload(newConf) p.api.onConfReload(newConf)
} }
} }
@@ -551,8 +551,8 @@ func (p *Core) reloadConf(newConf *conf.Conf, calledByAPI bool) error {
return p.createResources(false) return p.createResources(false)
} }
// OnAPIConfigSet is called by api. // onAPIConfigSet is called by api.
func (p *Core) OnAPIConfigSet(conf *conf.Conf) { func (p *Core) onAPIConfigSet(conf *conf.Conf) {
select { select {
case p.apiConfigSet <- conf: case p.apiConfigSet <- conf:
case <-p.ctx.Done(): case <-p.ctx.Done():

View File

@@ -113,12 +113,12 @@ type hlsMuxerTrackIDPayloadPair struct {
} }
type hlsMuxerPathManager interface { type hlsMuxerPathManager interface {
OnReaderSetupPlay(req pathReaderSetupPlayReq) pathReaderSetupPlayRes onReaderSetupPlay(req pathReaderSetupPlayReq) pathReaderSetupPlayRes
} }
type hlsMuxerParent interface { type hlsMuxerParent interface {
Log(logger.Level, string, ...interface{}) log(logger.Level, string, ...interface{})
OnMuxerClose(*hlsMuxer) onMuxerClose(*hlsMuxer)
} }
type hlsMuxer struct { type hlsMuxer struct {
@@ -181,12 +181,12 @@ func newHLSMuxer(
return r return r
} }
func (r *hlsMuxer) Close() { func (r *hlsMuxer) close() {
r.ctxCancel() r.ctxCancel()
} }
func (r *hlsMuxer) log(level logger.Level, format string, args ...interface{}) { func (r *hlsMuxer) log(level logger.Level, format string, args ...interface{}) {
r.parent.Log(level, "[muxer %s] "+format, append([]interface{}{r.pathName}, args...)...) r.parent.log(level, "[muxer %s] "+format, append([]interface{}{r.pathName}, args...)...)
} }
// PathName returns the path name. // PathName returns the path name.
@@ -241,13 +241,13 @@ func (r *hlsMuxer) run() {
req.Res <- hlsMuxerResponse{Status: http.StatusNotFound} req.Res <- hlsMuxerResponse{Status: http.StatusNotFound}
} }
r.parent.OnMuxerClose(r) r.parent.onMuxerClose(r)
r.log(logger.Info, "destroyed (%v)", err) r.log(logger.Info, "destroyed (%v)", err)
} }
func (r *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) error { func (r *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) error {
res := r.pathManager.OnReaderSetupPlay(pathReaderSetupPlayReq{ res := r.pathManager.onReaderSetupPlay(pathReaderSetupPlayReq{
Author: r, Author: r,
PathName: r.pathName, PathName: r.pathName,
IP: nil, IP: nil,
@@ -260,7 +260,7 @@ func (r *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{})
r.path = res.Path r.path = res.Path
defer func() { defer func() {
r.path.OnReaderRemove(pathReaderRemoveReq{Author: r}) r.path.onReaderRemove(pathReaderRemoveReq{Author: r})
}() }()
var videoTrack *gortsplib.Track var videoTrack *gortsplib.Track
@@ -318,7 +318,7 @@ func (r *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{})
r.ringBuffer = ringbuffer.New(uint64(r.readBufferCount)) r.ringBuffer = ringbuffer.New(uint64(r.readBufferCount))
r.path.OnReaderPlay(pathReaderPlayReq{Author: r}) r.path.onReaderPlay(pathReaderPlayReq{Author: r})
writerDone := make(chan error) writerDone := make(chan error)
go func() { go func() {
@@ -474,8 +474,8 @@ func (r *hlsMuxer) handleRequest(req hlsMuxerRequest) hlsMuxerResponse {
} }
} }
// OnRequest is called by hlsserver.Server (forwarded from ServeHTTP). // onRequest is called by hlsserver.Server (forwarded from ServeHTTP).
func (r *hlsMuxer) OnRequest(req hlsMuxerRequest) { func (r *hlsMuxer) onRequest(req hlsMuxerRequest) {
select { select {
case r.request <- req: case r.request <- req:
case <-r.ctx.Done(): case <-r.ctx.Done():
@@ -483,20 +483,20 @@ func (r *hlsMuxer) OnRequest(req hlsMuxerRequest) {
} }
} }
// OnReaderAccepted implements reader. // onReaderAccepted implements reader.
func (r *hlsMuxer) OnReaderAccepted() { func (r *hlsMuxer) onReaderAccepted() {
r.log(logger.Info, "is converting into HLS") r.log(logger.Info, "is converting into HLS")
} }
// OnReaderFrame implements reader. // onReaderFrame implements reader.
func (r *hlsMuxer) OnReaderFrame(trackID int, streamType gortsplib.StreamType, payload []byte) { func (r *hlsMuxer) onReaderFrame(trackID int, streamType gortsplib.StreamType, payload []byte) {
if streamType == gortsplib.StreamTypeRTP { if streamType == gortsplib.StreamTypeRTP {
r.ringBuffer.Push(hlsMuxerTrackIDPayloadPair{trackID, payload}) r.ringBuffer.Push(hlsMuxerTrackIDPayloadPair{trackID, payload})
} }
} }
// OnReaderAPIDescribe implements reader. // onReaderAPIDescribe implements reader.
func (r *hlsMuxer) OnReaderAPIDescribe() interface{} { func (r *hlsMuxer) onReaderAPIDescribe() interface{} {
return struct { return struct {
Type string `json:"type"` Type string `json:"type"`
}{"hlsMuxer"} }{"hlsMuxer"}

View File

@@ -76,9 +76,9 @@ func newHLSServer(
muxerClose: make(chan *hlsMuxer), muxerClose: make(chan *hlsMuxer),
} }
s.Log(logger.Info, "listener opened on "+address) s.log(logger.Info, "listener opened on "+address)
s.pathManager.OnHLSServerSet(s) s.pathManager.onHLSServerSet(s)
s.wg.Add(1) s.wg.Add(1)
go s.run() go s.run()
@@ -87,14 +87,14 @@ func newHLSServer(
} }
// Log is the main logging function. // Log is the main logging function.
func (s *hlsServer) Log(level logger.Level, format string, args ...interface{}) { func (s *hlsServer) log(level logger.Level, format string, args ...interface{}) {
s.parent.Log(level, "[HLS] "+format, append([]interface{}{}, args...)...) s.parent.Log(level, "[HLS] "+format, append([]interface{}{}, args...)...)
} }
func (s *hlsServer) close() { func (s *hlsServer) close() {
s.ctxCancel() s.ctxCancel()
s.wg.Wait() s.wg.Wait()
s.Log(logger.Info, "closed") s.log(logger.Info, "closed")
} }
func (s *hlsServer) run() { func (s *hlsServer) run() {
@@ -116,7 +116,7 @@ outer:
case req := <-s.request: case req := <-s.request:
r := s.findOrCreateMuxer(req.Dir) r := s.findOrCreateMuxer(req.Dir)
r.OnRequest(req) r.onRequest(req)
case c := <-s.muxerClose: case c := <-s.muxerClose:
if c2, ok := s.muxers[c.PathName()]; !ok || c2 != c { if c2, ok := s.muxers[c.PathName()]; !ok || c2 != c {
@@ -133,14 +133,14 @@ outer:
hs.Shutdown(context.Background()) hs.Shutdown(context.Background())
s.pathManager.OnHLSServerSet(nil) s.pathManager.onHLSServerSet(nil)
} }
func (s *hlsServer) onRequest(ctx *gin.Context) { func (s *hlsServer) onRequest(ctx *gin.Context) {
s.Log(logger.Info, "[conn %v] %s %s", ctx.Request.RemoteAddr, ctx.Request.Method, ctx.Request.URL.Path) s.log(logger.Info, "[conn %v] %s %s", ctx.Request.RemoteAddr, ctx.Request.Method, ctx.Request.URL.Path)
byts, _ := httputil.DumpRequest(ctx.Request, true) byts, _ := httputil.DumpRequest(ctx.Request, true)
s.Log(logger.Debug, "[conn %v] [c->s] %s", ctx.Request.RemoteAddr, string(byts)) s.log(logger.Debug, "[conn %v] [c->s] %s", ctx.Request.RemoteAddr, string(byts))
logw := &httpLogWriter{ResponseWriter: ctx.Writer} logw := &httpLogWriter{ResponseWriter: ctx.Writer}
ctx.Writer = logw ctx.Writer = logw
@@ -211,7 +211,7 @@ func (s *hlsServer) onRequest(ctx *gin.Context) {
case <-s.ctx.Done(): case <-s.ctx.Done():
} }
s.Log(logger.Debug, "[conn %v] [s->c] %s", ctx.Request.RemoteAddr, logw.dump()) s.log(logger.Debug, "[conn %v] [s->c] %s", ctx.Request.RemoteAddr, logw.dump())
} }
func (s *hlsServer) findOrCreateMuxer(pathName string) *hlsMuxer { func (s *hlsServer) findOrCreateMuxer(pathName string) *hlsMuxer {
@@ -232,16 +232,16 @@ func (s *hlsServer) findOrCreateMuxer(pathName string) *hlsMuxer {
return r return r
} }
// OnMuxerClose is called by hlsMuxer. // onMuxerClose is called by hlsMuxer.
func (s *hlsServer) OnMuxerClose(c *hlsMuxer) { func (s *hlsServer) onMuxerClose(c *hlsMuxer) {
select { select {
case s.muxerClose <- c: case s.muxerClose <- c:
case <-s.ctx.Done(): case <-s.ctx.Done():
} }
} }
// OnPathSourceReady is called by core. // onPathSourceReady is called by core.
func (s *hlsServer) OnPathSourceReady(pa *path) { func (s *hlsServer) onPathSourceReady(pa *path) {
select { select {
case s.pathSourceReady <- pa: case s.pathSourceReady <- pa:
case <-s.ctx.Done(): case <-s.ctx.Done():

View File

@@ -17,8 +17,8 @@ const (
) )
type hlsSourceParent interface { type hlsSourceParent interface {
Log(logger.Level, string, ...interface{}) log(logger.Level, string, ...interface{})
OnSourceStaticSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes onSourceStaticSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes
OnSourceStaticSetNotReady(req pathSourceStaticSetNotReadyReq) OnSourceStaticSetNotReady(req pathSourceStaticSetNotReadyReq)
} }
@@ -57,13 +57,13 @@ func newHLSSource(
return s return s
} }
func (s *hlsSource) Close() { func (s *hlsSource) close() {
s.Log(logger.Info, "stopped") s.Log(logger.Info, "stopped")
s.ctxCancel() s.ctxCancel()
} }
func (s *hlsSource) Log(level logger.Level, format string, args ...interface{}) { func (s *hlsSource) Log(level logger.Level, format string, args ...interface{}) {
s.parent.Log(level, "[hls source] "+format, args...) s.parent.log(level, "[hls source] "+format, args...)
} }
func (s *hlsSource) run() { func (s *hlsSource) run() {
@@ -112,7 +112,7 @@ func (s *hlsSource) runInner() bool {
tracks = append(tracks, audioTrack) tracks = append(tracks, audioTrack)
} }
res := s.parent.OnSourceStaticSetReady(pathSourceStaticSetReadyReq{ res := s.parent.onSourceStaticSetReady(pathSourceStaticSetReadyReq{
Source: s, Source: s,
Tracks: tracks, Tracks: tracks,
}) })
@@ -162,8 +162,8 @@ func (s *hlsSource) runInner() bool {
} }
} }
// OnSourceAPIDescribe implements source. // onSourceAPIDescribe implements source.
func (*hlsSource) OnSourceAPIDescribe() interface{} { func (*hlsSource) onSourceAPIDescribe() interface{} {
return struct { return struct {
Type string `json:"type"` Type string `json:"type"`
}{"hlsSource"} }{"hlsSource"}

View File

@@ -20,15 +20,15 @@ func formatMetric(key string, value int64, nowUnix int64) string {
} }
type metricsPathManager interface { type metricsPathManager interface {
OnAPIPathsList(req apiPathsListReq1) apiPathsListRes1 onAPIPathsList(req apiPathsListReq1) apiPathsListRes1
} }
type metricsRTSPServer interface { type metricsRTSPServer interface {
OnAPIRTSPSessionsList(req apiRTSPSessionsListReq) apiRTSPSessionsListRes onAPIRTSPSessionsList(req apiRTSPSessionsListReq) apiRTSPSessionsListRes
} }
type metricsRTMPServer interface { type metricsRTMPServer interface {
OnAPIRTMPConnsList(req apiRTMPConnsListReq) apiRTMPConnsListRes onAPIRTMPConnsList(req apiRTMPConnsListReq) apiRTMPConnsListRes
} }
type metricsParent interface { type metricsParent interface {
@@ -87,7 +87,7 @@ func (m *metrics) onMetrics(ctx *gin.Context) {
out := "" out := ""
res := m.pathManager.OnAPIPathsList(apiPathsListReq1{}) res := m.pathManager.onAPIPathsList(apiPathsListReq1{})
if res.Err == nil { if res.Err == nil {
readyCount := int64(0) readyCount := int64(0)
notReadyCount := int64(0) notReadyCount := int64(0)
@@ -107,7 +107,7 @@ func (m *metrics) onMetrics(ctx *gin.Context) {
} }
if !interfaceIsEmpty(m.rtspServer) { if !interfaceIsEmpty(m.rtspServer) {
res := m.rtspServer.OnAPIRTSPSessionsList(apiRTSPSessionsListReq{}) res := m.rtspServer.onAPIRTSPSessionsList(apiRTSPSessionsListReq{})
if res.Err == nil { if res.Err == nil {
idleCount := int64(0) idleCount := int64(0)
readCount := int64(0) readCount := int64(0)
@@ -134,7 +134,7 @@ func (m *metrics) onMetrics(ctx *gin.Context) {
} }
if !interfaceIsEmpty(m.rtspsServer) { if !interfaceIsEmpty(m.rtspsServer) {
res := m.rtspsServer.OnAPIRTSPSessionsList(apiRTSPSessionsListReq{}) res := m.rtspsServer.onAPIRTSPSessionsList(apiRTSPSessionsListReq{})
if res.Err == nil { if res.Err == nil {
idleCount := int64(0) idleCount := int64(0)
readCount := int64(0) readCount := int64(0)
@@ -161,7 +161,7 @@ func (m *metrics) onMetrics(ctx *gin.Context) {
} }
if !interfaceIsEmpty(m.rtmpServer) { if !interfaceIsEmpty(m.rtmpServer) {
res := m.rtmpServer.OnAPIRTMPConnsList(apiRTMPConnsListReq{}) res := m.rtmpServer.onAPIRTMPConnsList(apiRTMPConnsListReq{})
if res.Err == nil { if res.Err == nil {
idleCount := int64(0) idleCount := int64(0)
readCount := int64(0) readCount := int64(0)
@@ -191,29 +191,29 @@ func (m *metrics) onMetrics(ctx *gin.Context) {
io.WriteString(ctx.Writer, out) io.WriteString(ctx.Writer, out)
} }
// OnPathManagerSet is called by pathManager. // onPathManagerSet is called by pathManager.
func (m *metrics) OnPathManagerSet(s metricsPathManager) { func (m *metrics) onPathManagerSet(s metricsPathManager) {
m.mutex.Lock() m.mutex.Lock()
defer m.mutex.Unlock() defer m.mutex.Unlock()
m.pathManager = s m.pathManager = s
} }
// OnRTSPServer is called by rtspServer (plain). // onRTSPServer is called by rtspServer (plain).
func (m *metrics) OnRTSPServerSet(s metricsRTSPServer) { func (m *metrics) onRTSPServerSet(s metricsRTSPServer) {
m.mutex.Lock() m.mutex.Lock()
defer m.mutex.Unlock() defer m.mutex.Unlock()
m.rtspServer = s m.rtspServer = s
} }
// OnRTSPServer is called by rtspServer (plain). // onRTSPServer is called by rtspServer (plain).
func (m *metrics) OnRTSPSServerSet(s metricsRTSPServer) { func (m *metrics) onRTSPSServerSet(s metricsRTSPServer) {
m.mutex.Lock() m.mutex.Lock()
defer m.mutex.Unlock() defer m.mutex.Unlock()
m.rtspsServer = s m.rtspsServer = s
} }
// OnRTMPServerSet is called by rtmpServer. // onRTMPServerSet is called by rtmpServer.
func (m *metrics) OnRTMPServerSet(s metricsRTMPServer) { func (m *metrics) onRTMPServerSet(s metricsRTMPServer) {
m.mutex.Lock() m.mutex.Lock()
defer m.mutex.Unlock() defer m.mutex.Unlock()
m.rtmpServer = s m.rtmpServer = s

View File

@@ -52,9 +52,9 @@ func (pathErrAuthCritical) Error() string {
} }
type pathParent interface { type pathParent interface {
Log(logger.Level, string, ...interface{}) log(logger.Level, string, ...interface{})
OnPathSourceReady(*path) onPathSourceReady(*path)
OnPathClose(*path) onPathClose(*path)
} }
type pathRTSPSession interface { type pathRTSPSession interface {
@@ -63,8 +63,8 @@ type pathRTSPSession interface {
type sourceRedirect struct{} type sourceRedirect struct{}
// OnSourceAPIDescribe implements source. // onSourceAPIDescribe implements source.
func (*sourceRedirect) OnSourceAPIDescribe() interface{} { func (*sourceRedirect) onSourceAPIDescribe() interface{} {
return struct { return struct {
Type string `json:"type"` Type string `json:"type"`
}{"redirect"} }{"redirect"}
@@ -269,7 +269,7 @@ func newPath(
apiPathsList: make(chan apiPathsListReq2), apiPathsList: make(chan apiPathsListReq2),
} }
pa.Log(logger.Info, "created") pa.log(logger.Info, "created")
pa.wg.Add(1) pa.wg.Add(1)
go pa.run() go pa.run()
@@ -277,14 +277,14 @@ func newPath(
return pa return pa
} }
func (pa *path) Close() { func (pa *path) close() {
pa.ctxCancel() pa.ctxCancel()
pa.Log(logger.Info, "destroyed") pa.log(logger.Info, "destroyed")
} }
// Log is the main logging function. // Log is the main logging function.
func (pa *path) Log(level logger.Level, format string, args ...interface{}) { func (pa *path) log(level logger.Level, format string, args ...interface{}) {
pa.parent.Log(level, "[path "+pa.name+"] "+format, args...) pa.parent.log(level, "[path "+pa.name+"] "+format, args...)
} }
// ConfName returns the configuration name of this path. // ConfName returns the configuration name of this path.
@@ -313,7 +313,7 @@ func (pa *path) run() {
var onInitCmd *externalcmd.Cmd var onInitCmd *externalcmd.Cmd
if pa.conf.RunOnInit != "" { if pa.conf.RunOnInit != "" {
pa.Log(logger.Info, "runOnInit command started") pa.log(logger.Info, "runOnInit command started")
_, port, _ := net.SplitHostPort(pa.rtspAddress) _, port, _ := net.SplitHostPort(pa.rtspAddress)
onInitCmd = externalcmd.New(pa.conf.RunOnInit, pa.conf.RunOnInitRestart, externalcmd.Environment{ onInitCmd = externalcmd.New(pa.conf.RunOnInit, pa.conf.RunOnInitRestart, externalcmd.Environment{
Path: pa.name, Path: pa.name,
@@ -420,7 +420,7 @@ outer:
if onInitCmd != nil { if onInitCmd != nil {
onInitCmd.Close() onInitCmd.Close()
pa.Log(logger.Info, "runOnInit command stopped") pa.log(logger.Info, "runOnInit command stopped")
} }
for _, req := range pa.describeRequests { for _, req := range pa.describeRequests {
@@ -435,7 +435,7 @@ outer:
if state == pathReaderStatePlay { if state == pathReaderStatePlay {
atomic.AddInt64(pa.stats.CountReaders, -1) atomic.AddInt64(pa.stats.CountReaders, -1)
} }
rp.Close() rp.close()
} }
if pa.stream != nil { if pa.stream != nil {
@@ -444,13 +444,13 @@ outer:
if pa.source != nil { if pa.source != nil {
if source, ok := pa.source.(sourceStatic); ok { if source, ok := pa.source.(sourceStatic); ok {
source.Close() source.close()
pa.sourceStaticWg.Wait() pa.sourceStaticWg.Wait()
} else if source, ok := pa.source.(publisher); ok { } else if source, ok := pa.source.(publisher); ok {
if pa.sourceReady { if pa.sourceReady {
atomic.AddInt64(pa.stats.CountPublishers, -1) atomic.AddInt64(pa.stats.CountPublishers, -1)
} }
source.Close() source.close()
} }
} }
@@ -461,10 +461,10 @@ outer:
// the path is already waiting for the command to close. // the path is already waiting for the command to close.
if pa.onDemandCmd != nil { if pa.onDemandCmd != nil {
pa.onDemandCmd.Close() pa.onDemandCmd.Close()
pa.Log(logger.Info, "runOnDemand command stopped") pa.log(logger.Info, "runOnDemand command stopped")
} }
pa.parent.OnPathClose(pa) pa.parent.onPathClose(pa)
} }
func (pa *path) hasStaticSource() bool { func (pa *path) hasStaticSource() bool {
@@ -486,7 +486,7 @@ func (pa *path) onDemandStartSource() {
pa.onDemandReadyTimer = time.NewTimer(time.Duration(pa.conf.SourceOnDemandStartTimeout)) pa.onDemandReadyTimer = time.NewTimer(time.Duration(pa.conf.SourceOnDemandStartTimeout))
} else { } else {
pa.Log(logger.Info, "runOnDemand command started") pa.log(logger.Info, "runOnDemand command started")
_, port, _ := net.SplitHostPort(pa.rtspAddress) _, port, _ := net.SplitHostPort(pa.rtspAddress)
pa.onDemandCmd = externalcmd.New(pa.conf.RunOnDemand, pa.conf.RunOnDemandRestart, externalcmd.Environment{ pa.onDemandCmd = externalcmd.New(pa.conf.RunOnDemand, pa.conf.RunOnDemandRestart, externalcmd.Environment{
Path: pa.name, Path: pa.name,
@@ -522,11 +522,11 @@ func (pa *path) onDemandCloseSource() {
if pa.sourceReady { if pa.sourceReady {
pa.sourceSetNotReady() pa.sourceSetNotReady()
} }
pa.source.(sourceStatic).Close() pa.source.(sourceStatic).close()
pa.source = nil pa.source = nil
} else { } else {
if pa.source != nil { if pa.source != nil {
pa.source.(publisher).Close() pa.source.(publisher).close()
pa.doPublisherRemove() pa.doPublisherRemove()
} }
@@ -538,7 +538,7 @@ func (pa *path) onDemandCloseSource() {
if pa.onDemandCmd != nil { if pa.onDemandCmd != nil {
pa.onDemandCmd.Close() pa.onDemandCmd.Close()
pa.onDemandCmd = nil pa.onDemandCmd = nil
pa.Log(logger.Info, "runOnDemand command stopped") pa.log(logger.Info, "runOnDemand command stopped")
} }
} }
} }
@@ -570,13 +570,13 @@ func (pa *path) sourceSetReady(tracks gortsplib.Tracks) {
} }
} }
pa.parent.OnPathSourceReady(pa) pa.parent.onPathSourceReady(pa)
} }
func (pa *path) sourceSetNotReady() { func (pa *path) sourceSetNotReady() {
for r := range pa.readers { for r := range pa.readers {
pa.doReaderRemove(r) pa.doReaderRemove(r)
r.Close() r.close()
} }
// close onPublishCmd after all readers have been closed. // close onPublishCmd after all readers have been closed.
@@ -587,7 +587,7 @@ func (pa *path) sourceSetNotReady() {
if pa.onPublishCmd != nil { if pa.onPublishCmd != nil {
pa.onPublishCmd.Close() pa.onPublishCmd.Close()
pa.onPublishCmd = nil pa.onPublishCmd = nil
pa.Log(logger.Info, "runOnPublish command stopped") pa.log(logger.Info, "runOnPublish command stopped")
} }
pa.sourceReady = false pa.sourceReady = false
@@ -653,7 +653,7 @@ func (pa *path) doPublisherRemove() {
} else { } else {
for r := range pa.readers { for r := range pa.readers {
pa.doReaderRemove(r) pa.doReaderRemove(r)
r.Close() r.close()
} }
} }
@@ -722,8 +722,8 @@ func (pa *path) handlePublisherAnnounce(req pathPublisherAnnounceReq) {
return return
} }
pa.Log(logger.Info, "closing existing publisher") pa.log(logger.Info, "closing existing publisher")
pa.source.(publisher).Close() pa.source.(publisher).close()
pa.doPublisherRemove() pa.doPublisherRemove()
} }
@@ -740,12 +740,12 @@ func (pa *path) handlePublisherRecord(req pathPublisherRecordReq) {
atomic.AddInt64(pa.stats.CountPublishers, 1) atomic.AddInt64(pa.stats.CountPublishers, 1)
req.Author.OnPublisherAccepted(len(req.Tracks)) req.Author.onPublisherAccepted(len(req.Tracks))
pa.sourceSetReady(req.Tracks) pa.sourceSetReady(req.Tracks)
if pa.conf.RunOnPublish != "" { if pa.conf.RunOnPublish != "" {
pa.Log(logger.Info, "runOnPublish command started") pa.log(logger.Info, "runOnPublish command started")
_, port, _ := net.SplitHostPort(pa.rtspAddress) _, port, _ := net.SplitHostPort(pa.rtspAddress)
pa.onPublishCmd = externalcmd.New(pa.conf.RunOnPublish, pa.conf.RunOnPublishRestart, externalcmd.Environment{ pa.onPublishCmd = externalcmd.New(pa.conf.RunOnPublish, pa.conf.RunOnPublishRestart, externalcmd.Environment{
Path: pa.name, Path: pa.name,
@@ -820,7 +820,7 @@ func (pa *path) handleReaderPlay(req pathReaderPlayReq) {
pa.stream.readerAdd(req.Author) pa.stream.readerAdd(req.Author)
req.Author.OnReaderAccepted() req.Author.onReaderAccepted()
close(req.Res) close(req.Res)
} }
@@ -842,13 +842,13 @@ func (pa *path) handleAPIPathsList(req apiPathsListReq2) {
if pa.source == nil { if pa.source == nil {
return nil return nil
} }
return pa.source.OnSourceAPIDescribe() return pa.source.onSourceAPIDescribe()
}(), }(),
SourceReady: pa.sourceReady, SourceReady: pa.sourceReady,
Readers: func() []interface{} { Readers: func() []interface{} {
ret := []interface{}{} ret := []interface{}{}
for r := range pa.readers { for r := range pa.readers {
ret = append(ret, r.OnReaderAPIDescribe()) ret = append(ret, r.onReaderAPIDescribe())
} }
return ret return ret
}(), }(),
@@ -856,8 +856,8 @@ func (pa *path) handleAPIPathsList(req apiPathsListReq2) {
close(req.Res) close(req.Res)
} }
// OnSourceStaticSetReady is called by a sourceStatic. // onSourceStaticSetReady is called by a sourceStatic.
func (pa *path) OnSourceStaticSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes { func (pa *path) onSourceStaticSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes {
req.Res = make(chan pathSourceStaticSetReadyRes) req.Res = make(chan pathSourceStaticSetReadyRes)
select { select {
case pa.sourceStaticSetReady <- req: case pa.sourceStaticSetReady <- req:
@@ -877,8 +877,8 @@ func (pa *path) OnSourceStaticSetNotReady(req pathSourceStaticSetNotReadyReq) {
} }
} }
// OnDescribe is called by a reader or publisher through pathManager. // onDescribe is called by a reader or publisher through pathManager.
func (pa *path) OnDescribe(req pathDescribeReq) pathDescribeRes { func (pa *path) onDescribe(req pathDescribeReq) pathDescribeRes {
select { select {
case pa.describe <- req: case pa.describe <- req:
return <-req.Res return <-req.Res
@@ -887,8 +887,8 @@ func (pa *path) OnDescribe(req pathDescribeReq) pathDescribeRes {
} }
} }
// OnPublisherRemove is called by a publisher. // onPublisherRemove is called by a publisher.
func (pa *path) OnPublisherRemove(req pathPublisherRemoveReq) { func (pa *path) onPublisherRemove(req pathPublisherRemoveReq) {
req.Res = make(chan struct{}) req.Res = make(chan struct{})
select { select {
case pa.publisherRemove <- req: case pa.publisherRemove <- req:
@@ -897,8 +897,8 @@ func (pa *path) OnPublisherRemove(req pathPublisherRemoveReq) {
} }
} }
// OnPublisherAnnounce is called by a publisher through pathManager. // onPublisherAnnounce is called by a publisher through pathManager.
func (pa *path) OnPublisherAnnounce(req pathPublisherAnnounceReq) pathPublisherAnnounceRes { func (pa *path) onPublisherAnnounce(req pathPublisherAnnounceReq) pathPublisherAnnounceRes {
select { select {
case pa.publisherAnnounce <- req: case pa.publisherAnnounce <- req:
return <-req.Res return <-req.Res
@@ -907,8 +907,8 @@ func (pa *path) OnPublisherAnnounce(req pathPublisherAnnounceReq) pathPublisherA
} }
} }
// OnPublisherRecord is called by a publisher. // onPublisherRecord is called by a publisher.
func (pa *path) OnPublisherRecord(req pathPublisherRecordReq) pathPublisherRecordRes { func (pa *path) onPublisherRecord(req pathPublisherRecordReq) pathPublisherRecordRes {
req.Res = make(chan pathPublisherRecordRes) req.Res = make(chan pathPublisherRecordRes)
select { select {
case pa.publisherRecord <- req: case pa.publisherRecord <- req:
@@ -918,8 +918,8 @@ func (pa *path) OnPublisherRecord(req pathPublisherRecordReq) pathPublisherRecor
} }
} }
// OnPublisherPause is called by a publisher. // onPublisherPause is called by a publisher.
func (pa *path) OnPublisherPause(req pathPublisherPauseReq) { func (pa *path) onPublisherPause(req pathPublisherPauseReq) {
req.Res = make(chan struct{}) req.Res = make(chan struct{})
select { select {
case pa.publisherPause <- req: case pa.publisherPause <- req:
@@ -928,8 +928,8 @@ func (pa *path) OnPublisherPause(req pathPublisherPauseReq) {
} }
} }
// OnReaderRemove is called by a reader. // onReaderRemove is called by a reader.
func (pa *path) OnReaderRemove(req pathReaderRemoveReq) { func (pa *path) onReaderRemove(req pathReaderRemoveReq) {
req.Res = make(chan struct{}) req.Res = make(chan struct{})
select { select {
case pa.readerRemove <- req: case pa.readerRemove <- req:
@@ -938,8 +938,8 @@ func (pa *path) OnReaderRemove(req pathReaderRemoveReq) {
} }
} }
// OnReaderSetupPlay is called by a reader through pathManager. // onReaderSetupPlay is called by a reader through pathManager.
func (pa *path) OnReaderSetupPlay(req pathReaderSetupPlayReq) pathReaderSetupPlayRes { func (pa *path) onReaderSetupPlay(req pathReaderSetupPlayReq) pathReaderSetupPlayRes {
select { select {
case pa.readerSetupPlay <- req: case pa.readerSetupPlay <- req:
return <-req.Res return <-req.Res
@@ -948,8 +948,8 @@ func (pa *path) OnReaderSetupPlay(req pathReaderSetupPlayReq) pathReaderSetupPla
} }
} }
// OnReaderPlay is called by a reader. // onReaderPlay is called by a reader.
func (pa *path) OnReaderPlay(req pathReaderPlayReq) { func (pa *path) onReaderPlay(req pathReaderPlayReq) {
req.Res = make(chan struct{}) req.Res = make(chan struct{})
select { select {
case pa.readerPlay <- req: case pa.readerPlay <- req:
@@ -958,8 +958,8 @@ func (pa *path) OnReaderPlay(req pathReaderPlayReq) {
} }
} }
// OnReaderPause is called by a reader. // onReaderPause is called by a reader.
func (pa *path) OnReaderPause(req pathReaderPauseReq) { func (pa *path) onReaderPause(req pathReaderPauseReq) {
req.Res = make(chan struct{}) req.Res = make(chan struct{})
select { select {
case pa.readerPause <- req: case pa.readerPause <- req:
@@ -968,8 +968,8 @@ func (pa *path) OnReaderPause(req pathReaderPauseReq) {
} }
} }
// OnAPIPathsList is called by api. // onAPIPathsList is called by api.
func (pa *path) OnAPIPathsList(req apiPathsListReq2) { func (pa *path) onAPIPathsList(req apiPathsListReq2) {
req.Res = make(chan struct{}) req.Res = make(chan struct{})
select { select {
case pa.apiPathsList <- req: case pa.apiPathsList <- req:

View File

@@ -13,7 +13,7 @@ import (
) )
type pathManagerHLSServer interface { type pathManagerHLSServer interface {
OnPathSourceReady(pa *path) onPathSourceReady(pa *path)
} }
type pathManagerParent interface { type pathManagerParent interface {
@@ -91,7 +91,7 @@ func newPathManager(
} }
if pm.metrics != nil { if pm.metrics != nil {
pm.metrics.OnPathManagerSet(pm) pm.metrics.onPathManagerSet(pm)
} }
pm.wg.Add(1) pm.wg.Add(1)
@@ -106,7 +106,7 @@ func (pm *pathManager) close() {
} }
// Log is the main logging function. // Log is the main logging function.
func (pm *pathManager) Log(level logger.Level, format string, args ...interface{}) { func (pm *pathManager) log(level logger.Level, format string, args ...interface{}) {
pm.parent.Log(level, format, args...) pm.parent.Log(level, format, args...)
} }
@@ -143,7 +143,7 @@ outer:
for _, pa := range pm.paths { for _, pa := range pm.paths {
if pathConf, ok := pm.pathConfs[pa.ConfName()]; !ok || pathConf != pa.Conf() { if pathConf, ok := pm.pathConfs[pa.ConfName()]; !ok || pathConf != pa.Conf() {
delete(pm.paths, pa.Name()) delete(pm.paths, pa.Name())
pa.Close() pa.close()
} }
} }
@@ -159,11 +159,11 @@ outer:
continue continue
} }
delete(pm.paths, pa.Name()) delete(pm.paths, pa.Name())
pa.Close() pa.close()
case pa := <-pm.pathSourceReady: case pa := <-pm.pathSourceReady:
if pm.hlsServer != nil { if pm.hlsServer != nil {
pm.hlsServer.OnPathSourceReady(pa) pm.hlsServer.onPathSourceReady(pa)
} }
case req := <-pm.describe: case req := <-pm.describe:
@@ -269,7 +269,7 @@ outer:
pm.ctxCancel() pm.ctxCancel()
if pm.metrics != nil { if pm.metrics != nil {
pm.metrics.OnPathManagerSet(nil) pm.metrics.onPathManagerSet(nil)
} }
} }
@@ -341,32 +341,32 @@ func (pm *pathManager) authenticate(
return nil return nil
} }
// OnConfReload is called by core. // onConfReload is called by core.
func (pm *pathManager) OnConfReload(pathConfs map[string]*conf.PathConf) { func (pm *pathManager) onConfReload(pathConfs map[string]*conf.PathConf) {
select { select {
case pm.confReload <- pathConfs: case pm.confReload <- pathConfs:
case <-pm.ctx.Done(): case <-pm.ctx.Done():
} }
} }
// OnPathSourceReady is called by path. // onPathSourceReady is called by path.
func (pm *pathManager) OnPathSourceReady(pa *path) { func (pm *pathManager) onPathSourceReady(pa *path) {
select { select {
case pm.pathSourceReady <- pa: case pm.pathSourceReady <- pa:
case <-pm.ctx.Done(): case <-pm.ctx.Done():
} }
} }
// OnPathClose is called by path. // onPathClose is called by path.
func (pm *pathManager) OnPathClose(pa *path) { func (pm *pathManager) onPathClose(pa *path) {
select { select {
case pm.pathClose <- pa: case pm.pathClose <- pa:
case <-pm.ctx.Done(): case <-pm.ctx.Done():
} }
} }
// OnDescribe is called by a reader or publisher. // onDescribe is called by a reader or publisher.
func (pm *pathManager) OnDescribe(req pathDescribeReq) pathDescribeRes { func (pm *pathManager) onDescribe(req pathDescribeReq) pathDescribeRes {
req.Res = make(chan pathDescribeRes) req.Res = make(chan pathDescribeRes)
select { select {
case pm.describe <- req: case pm.describe <- req:
@@ -375,15 +375,15 @@ func (pm *pathManager) OnDescribe(req pathDescribeReq) pathDescribeRes {
return res return res
} }
return res.Path.OnDescribe(req) return res.Path.onDescribe(req)
case <-pm.ctx.Done(): case <-pm.ctx.Done():
return pathDescribeRes{Err: fmt.Errorf("terminated")} return pathDescribeRes{Err: fmt.Errorf("terminated")}
} }
} }
// OnPublisherAnnounce is called by a publisher. // onPublisherAnnounce is called by a publisher.
func (pm *pathManager) OnPublisherAnnounce(req pathPublisherAnnounceReq) pathPublisherAnnounceRes { func (pm *pathManager) onPublisherAnnounce(req pathPublisherAnnounceReq) pathPublisherAnnounceRes {
req.Res = make(chan pathPublisherAnnounceRes) req.Res = make(chan pathPublisherAnnounceRes)
select { select {
case pm.publisherAnnounce <- req: case pm.publisherAnnounce <- req:
@@ -392,15 +392,15 @@ func (pm *pathManager) OnPublisherAnnounce(req pathPublisherAnnounceReq) pathPub
return res return res
} }
return res.Path.OnPublisherAnnounce(req) return res.Path.onPublisherAnnounce(req)
case <-pm.ctx.Done(): case <-pm.ctx.Done():
return pathPublisherAnnounceRes{Err: fmt.Errorf("terminated")} return pathPublisherAnnounceRes{Err: fmt.Errorf("terminated")}
} }
} }
// OnReaderSetupPlay is called by a reader. // onReaderSetupPlay is called by a reader.
func (pm *pathManager) OnReaderSetupPlay(req pathReaderSetupPlayReq) pathReaderSetupPlayRes { func (pm *pathManager) onReaderSetupPlay(req pathReaderSetupPlayReq) pathReaderSetupPlayRes {
req.Res = make(chan pathReaderSetupPlayRes) req.Res = make(chan pathReaderSetupPlayRes)
select { select {
case pm.readerSetupPlay <- req: case pm.readerSetupPlay <- req:
@@ -409,23 +409,23 @@ func (pm *pathManager) OnReaderSetupPlay(req pathReaderSetupPlayReq) pathReaderS
return res return res
} }
return res.Path.OnReaderSetupPlay(req) return res.Path.onReaderSetupPlay(req)
case <-pm.ctx.Done(): case <-pm.ctx.Done():
return pathReaderSetupPlayRes{Err: fmt.Errorf("terminated")} return pathReaderSetupPlayRes{Err: fmt.Errorf("terminated")}
} }
} }
// OnHLSServerSet is called by hlsServer. // onHLSServerSet is called by hlsServer.
func (pm *pathManager) OnHLSServerSet(s pathManagerHLSServer) { func (pm *pathManager) onHLSServerSet(s pathManagerHLSServer) {
select { select {
case pm.hlsServerSet <- s: case pm.hlsServerSet <- s:
case <-pm.ctx.Done(): case <-pm.ctx.Done():
} }
} }
// OnAPIPathsList is called by api. // onAPIPathsList is called by api.
func (pm *pathManager) OnAPIPathsList(req apiPathsListReq1) apiPathsListRes1 { func (pm *pathManager) onAPIPathsList(req apiPathsListReq1) apiPathsListRes1 {
req.Res = make(chan apiPathsListRes1) req.Res = make(chan apiPathsListRes1)
select { select {
case pm.apiPathsList <- req: case pm.apiPathsList <- req:
@@ -436,7 +436,7 @@ func (pm *pathManager) OnAPIPathsList(req apiPathsListReq1) apiPathsListRes1 {
} }
for _, pa := range res1.Paths { for _, pa := range res1.Paths {
pa.OnAPIPathsList(apiPathsListReq2{Data: res1.Data}) pa.onAPIPathsList(apiPathsListReq2{Data: res1.Data})
} }
return res1 return res1

View File

@@ -3,6 +3,6 @@ package core
// publisher is an entity that can publish a stream dynamically. // publisher is an entity that can publish a stream dynamically.
type publisher interface { type publisher interface {
source source
Close() close()
OnPublisherAccepted(tracksLen int) onPublisherAccepted(tracksLen int)
} }

View File

@@ -6,8 +6,8 @@ import (
// reader is an entity that can read a stream. // reader is an entity that can read a stream.
type reader interface { type reader interface {
Close() close()
OnReaderAccepted() onReaderAccepted()
OnReaderFrame(int, gortsplib.StreamType, []byte) onReaderFrame(int, gortsplib.StreamType, []byte)
OnReaderAPIDescribe() interface{} onReaderAPIDescribe() interface{}
} }

View File

@@ -43,13 +43,13 @@ type rtmpConnTrackIDPayloadPair struct {
} }
type rtmpConnPathManager interface { type rtmpConnPathManager interface {
OnReaderSetupPlay(req pathReaderSetupPlayReq) pathReaderSetupPlayRes onReaderSetupPlay(req pathReaderSetupPlayReq) pathReaderSetupPlayRes
OnPublisherAnnounce(req pathPublisherAnnounceReq) pathPublisherAnnounceRes onPublisherAnnounce(req pathPublisherAnnounceReq) pathPublisherAnnounceRes
} }
type rtmpConnParent interface { type rtmpConnParent interface {
Log(logger.Level, string, ...interface{}) log(logger.Level, string, ...interface{})
OnConnClose(*rtmpConn) onConnClose(*rtmpConn)
} }
type rtmpConn struct { type rtmpConn struct {
@@ -113,7 +113,7 @@ func newRTMPConn(
} }
// Close closes a Conn. // Close closes a Conn.
func (c *rtmpConn) Close() { func (c *rtmpConn) close() {
c.ctxCancel() c.ctxCancel()
} }
@@ -128,7 +128,7 @@ func (c *rtmpConn) RemoteAddr() net.Addr {
} }
func (c *rtmpConn) log(level logger.Level, format string, args ...interface{}) { func (c *rtmpConn) log(level logger.Level, format string, args ...interface{}) {
c.parent.Log(level, "[conn %v] "+format, append([]interface{}{c.conn.NetConn().RemoteAddr()}, args...)...) c.parent.log(level, "[conn %v] "+format, append([]interface{}{c.conn.NetConn().RemoteAddr()}, args...)...)
} }
func (c *rtmpConn) ip() net.IP { func (c *rtmpConn) ip() net.IP {
@@ -179,7 +179,7 @@ func (c *rtmpConn) run() {
c.ctxCancel() c.ctxCancel()
c.parent.OnConnClose(c) c.parent.onConnClose(c)
c.log(logger.Info, "closed (%v)", err) c.log(logger.Info, "closed (%v)", err)
} }
@@ -206,7 +206,7 @@ func (c *rtmpConn) runInner(ctx context.Context) error {
func (c *rtmpConn) runRead(ctx context.Context) error { func (c *rtmpConn) runRead(ctx context.Context) error {
pathName, query := pathNameAndQuery(c.conn.URL()) pathName, query := pathNameAndQuery(c.conn.URL())
res := c.pathManager.OnReaderSetupPlay(pathReaderSetupPlayReq{ res := c.pathManager.onReaderSetupPlay(pathReaderSetupPlayReq{
Author: c, Author: c,
PathName: pathName, PathName: pathName,
IP: c.ip(), IP: c.ip(),
@@ -227,7 +227,7 @@ func (c *rtmpConn) runRead(ctx context.Context) error {
c.path = res.Path c.path = res.Path
defer func() { defer func() {
c.path.OnReaderRemove(pathReaderRemoveReq{Author: c}) c.path.onReaderRemove(pathReaderRemoveReq{Author: c})
}() }()
c.stateMutex.Lock() c.stateMutex.Lock()
@@ -278,7 +278,7 @@ func (c *rtmpConn) runRead(ctx context.Context) error {
c.ringBuffer.Close() c.ringBuffer.Close()
}() }()
c.path.OnReaderPlay(pathReaderPlayReq{ c.path.onReaderPlay(pathReaderPlayReq{
Author: c, Author: c,
}) })
@@ -448,7 +448,7 @@ func (c *rtmpConn) runPublish(ctx context.Context) error {
pathName, query := pathNameAndQuery(c.conn.URL()) pathName, query := pathNameAndQuery(c.conn.URL())
res := c.pathManager.OnPublisherAnnounce(pathPublisherAnnounceReq{ res := c.pathManager.onPublisherAnnounce(pathPublisherAnnounceReq{
Author: c, Author: c,
PathName: pathName, PathName: pathName,
IP: c.ip(), IP: c.ip(),
@@ -469,7 +469,7 @@ func (c *rtmpConn) runPublish(ctx context.Context) error {
c.path = res.Path c.path = res.Path
defer func() { defer func() {
c.path.OnPublisherRemove(pathPublisherRemoveReq{Author: c}) c.path.onPublisherRemove(pathPublisherRemoveReq{Author: c})
}() }()
c.stateMutex.Lock() c.stateMutex.Lock()
@@ -479,7 +479,7 @@ func (c *rtmpConn) runPublish(ctx context.Context) error {
// disable write deadline // disable write deadline
c.conn.NetConn().SetWriteDeadline(time.Time{}) c.conn.NetConn().SetWriteDeadline(time.Time{})
rres := c.path.OnPublisherRecord(pathPublisherRecordReq{ rres := c.path.onPublisherRecord(pathPublisherRecordReq{
Author: c, Author: c,
Tracks: tracks, Tracks: tracks,
}) })
@@ -589,36 +589,36 @@ func (c *rtmpConn) validateCredentials(
return nil return nil
} }
// OnReaderAccepted implements reader. // onReaderAccepted implements reader.
func (c *rtmpConn) OnReaderAccepted() { func (c *rtmpConn) onReaderAccepted() {
c.log(logger.Info, "is reading from path '%s'", c.path.Name()) c.log(logger.Info, "is reading from path '%s'", c.path.Name())
} }
// OnReaderFrame implements reader. // onReaderFrame implements reader.
func (c *rtmpConn) OnReaderFrame(trackID int, streamType gortsplib.StreamType, payload []byte) { func (c *rtmpConn) onReaderFrame(trackID int, streamType gortsplib.StreamType, payload []byte) {
if streamType == gortsplib.StreamTypeRTP { if streamType == gortsplib.StreamTypeRTP {
c.ringBuffer.Push(rtmpConnTrackIDPayloadPair{trackID, payload}) c.ringBuffer.Push(rtmpConnTrackIDPayloadPair{trackID, payload})
} }
} }
// OnReaderAPIDescribe implements reader. // onReaderAPIDescribe implements reader.
func (c *rtmpConn) OnReaderAPIDescribe() interface{} { func (c *rtmpConn) onReaderAPIDescribe() interface{} {
return struct { return struct {
Type string `json:"type"` Type string `json:"type"`
ID string `json:"id"` ID string `json:"id"`
}{"rtmpConn", c.id} }{"rtmpConn", c.id}
} }
// OnSourceAPIDescribe implements source. // onSourceAPIDescribe implements source.
func (c *rtmpConn) OnSourceAPIDescribe() interface{} { func (c *rtmpConn) onSourceAPIDescribe() interface{} {
return struct { return struct {
Type string `json:"type"` Type string `json:"type"`
ID string `json:"id"` ID string `json:"id"`
}{"rtmpConn", c.id} }{"rtmpConn", c.id}
} }
// OnPublisherAccepted implements publisher. // onPublisherAccepted implements publisher.
func (c *rtmpConn) OnPublisherAccepted(tracksLen int) { func (c *rtmpConn) onPublisherAccepted(tracksLen int) {
c.log(logger.Info, "is publishing to path '%s', %d %s", c.log(logger.Info, "is publishing to path '%s', %d %s",
c.path.Name(), c.path.Name(),
tracksLen, tracksLen,

View File

@@ -80,10 +80,10 @@ func newRTMPServer(
apiRTMPConnsKick: make(chan apiRTMPConnsKickReq), apiRTMPConnsKick: make(chan apiRTMPConnsKickReq),
} }
s.Log(logger.Info, "listener opened on %s", address) s.log(logger.Info, "listener opened on %s", address)
if s.metrics != nil { if s.metrics != nil {
s.metrics.OnRTMPServerSet(s) s.metrics.onRTMPServerSet(s)
} }
s.wg.Add(1) s.wg.Add(1)
@@ -92,14 +92,14 @@ func newRTMPServer(
return s, nil return s, nil
} }
func (s *rtmpServer) Log(level logger.Level, format string, args ...interface{}) { func (s *rtmpServer) log(level logger.Level, format string, args ...interface{}) {
s.parent.Log(level, "[RTMP] "+format, append([]interface{}{}, args...)...) s.parent.Log(level, "[RTMP] "+format, append([]interface{}{}, args...)...)
} }
func (s *rtmpServer) close() { func (s *rtmpServer) close() {
s.ctxCancel() s.ctxCancel()
s.wg.Wait() s.wg.Wait()
s.Log(logger.Info, "closed") s.log(logger.Info, "closed")
} }
func (s *rtmpServer) run() { func (s *rtmpServer) run() {
@@ -135,7 +135,7 @@ outer:
for { for {
select { select {
case err := <-acceptErr: case err := <-acceptErr:
s.Log(logger.Warn, "ERR: %s", err) s.log(logger.Warn, "ERR: %s", err)
break outer break outer
case nconn := <-connNew: case nconn := <-connNew:
@@ -190,7 +190,7 @@ outer:
for c := range s.conns { for c := range s.conns {
if c.ID() == req.ID { if c.ID() == req.ID {
delete(s.conns, c) delete(s.conns, c)
c.Close() c.close()
return true return true
} }
} }
@@ -212,7 +212,7 @@ outer:
s.l.Close() s.l.Close()
if s.metrics != nil { if s.metrics != nil {
s.metrics.OnRTMPServerSet(s) s.metrics.onRTMPServerSet(s)
} }
} }
@@ -244,16 +244,16 @@ func (s *rtmpServer) newConnID() (string, error) {
} }
} }
// OnConnClose is called by rtmpConn. // onConnClose is called by rtmpConn.
func (s *rtmpServer) OnConnClose(c *rtmpConn) { func (s *rtmpServer) onConnClose(c *rtmpConn) {
select { select {
case s.connClose <- c: case s.connClose <- c:
case <-s.ctx.Done(): case <-s.ctx.Done():
} }
} }
// OnAPIRTMPConnsList is called by api. // onAPIRTMPConnsList is called by api.
func (s *rtmpServer) OnAPIRTMPConnsList(req apiRTMPConnsListReq) apiRTMPConnsListRes { func (s *rtmpServer) onAPIRTMPConnsList(req apiRTMPConnsListReq) apiRTMPConnsListRes {
req.Res = make(chan apiRTMPConnsListRes) req.Res = make(chan apiRTMPConnsListRes)
select { select {
case s.apiRTMPConnsList <- req: case s.apiRTMPConnsList <- req:
@@ -263,8 +263,8 @@ func (s *rtmpServer) OnAPIRTMPConnsList(req apiRTMPConnsListReq) apiRTMPConnsLis
} }
} }
// OnAPIRTMPConnsKick is called by api. // onAPIRTMPConnsKick is called by api.
func (s *rtmpServer) OnAPIRTMPConnsKick(req apiRTMPConnsKickReq) apiRTMPConnsKickRes { func (s *rtmpServer) onAPIRTMPConnsKick(req apiRTMPConnsKickReq) apiRTMPConnsKickRes {
req.Res = make(chan apiRTMPConnsKickRes) req.Res = make(chan apiRTMPConnsKickRes)
select { select {
case s.apiRTMPConnsKick <- req: case s.apiRTMPConnsKick <- req:

View File

@@ -23,8 +23,8 @@ const (
) )
type rtmpSourceParent interface { type rtmpSourceParent interface {
Log(logger.Level, string, ...interface{}) log(logger.Level, string, ...interface{})
OnSourceStaticSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes onSourceStaticSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes
OnSourceStaticSetNotReady(req pathSourceStaticSetNotReadyReq) OnSourceStaticSetNotReady(req pathSourceStaticSetNotReadyReq)
} }
@@ -67,13 +67,13 @@ func newRTMPSource(
} }
// Close closes a Source. // Close closes a Source.
func (s *rtmpSource) Close() { func (s *rtmpSource) close() {
s.log(logger.Info, "stopped") s.log(logger.Info, "stopped")
s.ctxCancel() s.ctxCancel()
} }
func (s *rtmpSource) log(level logger.Level, format string, args ...interface{}) { func (s *rtmpSource) log(level logger.Level, format string, args ...interface{}) {
s.parent.Log(level, "[rtmp source] "+format, args...) s.parent.log(level, "[rtmp source] "+format, args...)
} }
func (s *rtmpSource) run() { func (s *rtmpSource) run() {
@@ -149,7 +149,7 @@ func (s *rtmpSource) runInner() bool {
tracks = append(tracks, audioTrack) tracks = append(tracks, audioTrack)
} }
res := s.parent.OnSourceStaticSetReady(pathSourceStaticSetReadyReq{ res := s.parent.onSourceStaticSetReady(pathSourceStaticSetReadyReq{
Source: s, Source: s,
Tracks: tracks, Tracks: tracks,
}) })
@@ -272,8 +272,8 @@ func (s *rtmpSource) runInner() bool {
} }
} }
// OnSourceAPIDescribe implements source. // onSourceAPIDescribe implements source.
func (*rtmpSource) OnSourceAPIDescribe() interface{} { func (*rtmpSource) onSourceAPIDescribe() interface{} {
return struct { return struct {
Type string `json:"type"` Type string `json:"type"`
}{"rtmpSource"} }{"rtmpSource"}

View File

@@ -20,7 +20,7 @@ const (
) )
type rtspConnParent interface { type rtspConnParent interface {
Log(logger.Level, string, ...interface{}) log(logger.Level, string, ...interface{})
} }
type rtspConn struct { type rtspConn struct {
@@ -75,7 +75,7 @@ func newRTSPConn(
} }
func (c *rtspConn) log(level logger.Level, format string, args ...interface{}) { func (c *rtspConn) log(level logger.Level, format string, args ...interface{}) {
c.parent.Log(level, "[conn %v] "+format, append([]interface{}{c.conn.NetConn().RemoteAddr()}, args...)...) c.parent.log(level, "[conn %v] "+format, append([]interface{}{c.conn.NetConn().RemoteAddr()}, args...)...)
} }
// Conn returns the RTSP connection. // Conn returns the RTSP connection.
@@ -152,8 +152,8 @@ func (c *rtspConn) validateCredentials(
return nil return nil
} }
// OnClose is called by rtspServer. // onClose is called by rtspServer.
func (c *rtspConn) OnClose(err error) { func (c *rtspConn) onClose(err error) {
c.log(logger.Info, "closed (%v)", err) c.log(logger.Info, "closed (%v)", err)
if c.onConnectCmd != nil { if c.onConnectCmd != nil {
@@ -162,8 +162,8 @@ func (c *rtspConn) OnClose(err error) {
} }
} }
// OnRequest is called by rtspServer. // onRequest is called by rtspServer.
func (c *rtspConn) OnRequest(req *base.Request) { func (c *rtspConn) onRequest(req *base.Request) {
c.log(logger.Debug, "[c->s] %v", req) c.log(logger.Debug, "[c->s] %v", req)
} }
@@ -172,10 +172,10 @@ func (c *rtspConn) OnResponse(res *base.Response) {
c.log(logger.Debug, "[s->c] %v", res) c.log(logger.Debug, "[s->c] %v", res)
} }
// OnDescribe is called by rtspServer. // onDescribe is called by rtspServer.
func (c *rtspConn) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx, func (c *rtspConn) onDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx,
) (*base.Response, *gortsplib.ServerStream, error) { ) (*base.Response, *gortsplib.ServerStream, error) {
res := c.pathManager.OnDescribe(pathDescribeReq{ res := c.pathManager.onDescribe(pathDescribeReq{
PathName: ctx.Path, PathName: ctx.Path,
URL: ctx.Req.URL, URL: ctx.Req.URL,
IP: c.ip(), IP: c.ip(),

View File

@@ -120,20 +120,20 @@ func newRTSPServer(
} }
if s.srv.UDPRTPAddress != "" { if s.srv.UDPRTPAddress != "" {
s.Log(logger.Info, "UDP/RTP listener opened on %s", s.srv.UDPRTPAddress) s.log(logger.Info, "UDP/RTP listener opened on %s", s.srv.UDPRTPAddress)
} }
if s.srv.UDPRTCPAddress != "" { if s.srv.UDPRTCPAddress != "" {
s.Log(logger.Info, "UDP/RTCP listener opened on %s", s.srv.UDPRTCPAddress) s.log(logger.Info, "UDP/RTCP listener opened on %s", s.srv.UDPRTCPAddress)
} }
s.Log(logger.Info, "TCP listener opened on %s", address) s.log(logger.Info, "TCP listener opened on %s", address)
if s.metrics != nil { if s.metrics != nil {
if !isTLS { if !isTLS {
s.metrics.OnRTSPServerSet(s) s.metrics.onRTSPServerSet(s)
} else { } else {
s.metrics.OnRTSPSServerSet(s) s.metrics.onRTSPSServerSet(s)
} }
} }
@@ -143,7 +143,7 @@ func newRTSPServer(
return s, nil return s, nil
} }
func (s *rtspServer) Log(level logger.Level, format string, args ...interface{}) { func (s *rtspServer) log(level logger.Level, format string, args ...interface{}) {
label := func() string { label := func() string {
if s.isTLS { if s.isTLS {
return "RTSPS" return "RTSPS"
@@ -156,7 +156,7 @@ func (s *rtspServer) Log(level logger.Level, format string, args ...interface{})
func (s *rtspServer) close() { func (s *rtspServer) close() {
s.ctxCancel() s.ctxCancel()
s.wg.Wait() s.wg.Wait()
s.Log(logger.Info, "closed") s.log(logger.Info, "closed")
} }
func (s *rtspServer) run() { func (s *rtspServer) run() {
@@ -178,7 +178,7 @@ func (s *rtspServer) run() {
outer: outer:
select { select {
case err := <-serverErr: case err := <-serverErr:
s.Log(logger.Warn, "ERR: %s", err) s.log(logger.Warn, "ERR: %s", err)
break outer break outer
case <-s.ctx.Done(): case <-s.ctx.Done():
@@ -191,9 +191,9 @@ outer:
if s.metrics != nil { if s.metrics != nil {
if !s.isTLS { if !s.isTLS {
s.metrics.OnRTSPServerSet(nil) s.metrics.onRTSPServerSet(nil)
} else { } else {
s.metrics.OnRTSPSServerSet(nil) s.metrics.onRTSPSServerSet(nil)
} }
} }
} }
@@ -250,7 +250,7 @@ func (s *rtspServer) OnConnClose(ctx *gortsplib.ServerHandlerOnConnCloseCtx) {
delete(s.conns, ctx.Conn) delete(s.conns, ctx.Conn)
s.mutex.Unlock() s.mutex.Unlock()
c.OnClose(ctx.Error) c.onClose(ctx.Error)
} }
// OnRequest implements gortsplib.ServerHandlerOnRequest. // OnRequest implements gortsplib.ServerHandlerOnRequest.
@@ -259,7 +259,7 @@ func (s *rtspServer) OnRequest(sc *gortsplib.ServerConn, req *base.Request) {
c := s.conns[sc] c := s.conns[sc]
s.mutex.Unlock() s.mutex.Unlock()
c.OnRequest(req) c.onRequest(req)
} }
// OnResponse implements gortsplib.ServerHandlerOnResponse. // OnResponse implements gortsplib.ServerHandlerOnResponse.
@@ -299,7 +299,7 @@ func (s *rtspServer) OnSessionClose(ctx *gortsplib.ServerHandlerOnSessionCloseCt
s.mutex.Unlock() s.mutex.Unlock()
if se != nil { if se != nil {
se.OnClose(ctx.Error) se.onClose(ctx.Error)
} }
} }
@@ -309,7 +309,7 @@ func (s *rtspServer) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx,
s.mutex.RLock() s.mutex.RLock()
c := s.conns[ctx.Conn] c := s.conns[ctx.Conn]
s.mutex.RUnlock() s.mutex.RUnlock()
return c.OnDescribe(ctx) return c.onDescribe(ctx)
} }
// OnAnnounce implements gortsplib.ServerHandlerOnAnnounce. // OnAnnounce implements gortsplib.ServerHandlerOnAnnounce.
@@ -318,7 +318,7 @@ func (s *rtspServer) OnAnnounce(ctx *gortsplib.ServerHandlerOnAnnounceCtx) (*bas
c := s.conns[ctx.Conn] c := s.conns[ctx.Conn]
se := s.sessions[ctx.Session] se := s.sessions[ctx.Session]
s.mutex.RUnlock() s.mutex.RUnlock()
return se.OnAnnounce(c, ctx) return se.onAnnounce(c, ctx)
} }
// OnSetup implements gortsplib.ServerHandlerOnSetup. // OnSetup implements gortsplib.ServerHandlerOnSetup.
@@ -327,7 +327,7 @@ func (s *rtspServer) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Resp
c := s.conns[ctx.Conn] c := s.conns[ctx.Conn]
se := s.sessions[ctx.Session] se := s.sessions[ctx.Session]
s.mutex.RUnlock() s.mutex.RUnlock()
return se.OnSetup(c, ctx) return se.onSetup(c, ctx)
} }
// OnPlay implements gortsplib.ServerHandlerOnPlay. // OnPlay implements gortsplib.ServerHandlerOnPlay.
@@ -335,7 +335,7 @@ func (s *rtspServer) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Respon
s.mutex.RLock() s.mutex.RLock()
se := s.sessions[ctx.Session] se := s.sessions[ctx.Session]
s.mutex.RUnlock() s.mutex.RUnlock()
return se.OnPlay(ctx) return se.onPlay(ctx)
} }
// OnRecord implements gortsplib.ServerHandlerOnRecord. // OnRecord implements gortsplib.ServerHandlerOnRecord.
@@ -343,7 +343,7 @@ func (s *rtspServer) OnRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.Re
s.mutex.RLock() s.mutex.RLock()
se := s.sessions[ctx.Session] se := s.sessions[ctx.Session]
s.mutex.RUnlock() s.mutex.RUnlock()
return se.OnRecord(ctx) return se.onRecord(ctx)
} }
// OnPause implements gortsplib.ServerHandlerOnPause. // OnPause implements gortsplib.ServerHandlerOnPause.
@@ -351,7 +351,7 @@ func (s *rtspServer) OnPause(ctx *gortsplib.ServerHandlerOnPauseCtx) (*base.Resp
s.mutex.RLock() s.mutex.RLock()
se := s.sessions[ctx.Session] se := s.sessions[ctx.Session]
s.mutex.RUnlock() s.mutex.RUnlock()
return se.OnPause(ctx) return se.onPause(ctx)
} }
// OnFrame implements gortsplib.ServerHandlerOnFrame. // OnFrame implements gortsplib.ServerHandlerOnFrame.
@@ -359,11 +359,11 @@ func (s *rtspServer) OnFrame(ctx *gortsplib.ServerHandlerOnFrameCtx) {
s.mutex.RLock() s.mutex.RLock()
se := s.sessions[ctx.Session] se := s.sessions[ctx.Session]
s.mutex.RUnlock() s.mutex.RUnlock()
se.OnFrame(ctx) se.onFrame(ctx)
} }
// OnAPIRTSPSessionsList is called by api and metrics. // onAPIRTSPSessionsList is called by api and metrics.
func (s *rtspServer) OnAPIRTSPSessionsList(req apiRTSPSessionsListReq) apiRTSPSessionsListRes { func (s *rtspServer) onAPIRTSPSessionsList(req apiRTSPSessionsListReq) apiRTSPSessionsListRes {
select { select {
case <-s.ctx.Done(): case <-s.ctx.Done():
return apiRTSPSessionsListRes{Err: fmt.Errorf("terminated")} return apiRTSPSessionsListRes{Err: fmt.Errorf("terminated")}
@@ -398,8 +398,8 @@ func (s *rtspServer) OnAPIRTSPSessionsList(req apiRTSPSessionsListReq) apiRTSPSe
return apiRTSPSessionsListRes{Data: data} return apiRTSPSessionsListRes{Data: data}
} }
// OnAPIRTSPSessionsKick is called by api. // onAPIRTSPSessionsKick is called by api.
func (s *rtspServer) OnAPIRTSPSessionsKick(req apiRTSPSessionsKickReq) apiRTSPSessionsKickRes { func (s *rtspServer) onAPIRTSPSessionsKick(req apiRTSPSessionsKickReq) apiRTSPSessionsKickRes {
select { select {
case <-s.ctx.Done(): case <-s.ctx.Done():
return apiRTSPSessionsKickRes{Err: fmt.Errorf("terminated")} return apiRTSPSessionsKickRes{Err: fmt.Errorf("terminated")}
@@ -411,9 +411,9 @@ func (s *rtspServer) OnAPIRTSPSessionsKick(req apiRTSPSessionsKickReq) apiRTSPSe
for key, se := range s.sessions { for key, se := range s.sessions {
if se.ID() == req.ID { if se.ID() == req.ID {
se.Close() se.close()
delete(s.sessions, key) delete(s.sessions, key)
se.OnClose(liberrors.ErrServerTerminated{}) se.onClose(liberrors.ErrServerTerminated{})
return apiRTSPSessionsKickRes{} return apiRTSPSessionsKickRes{}
} }
} }

View File

@@ -20,12 +20,12 @@ const (
) )
type rtspSessionPathManager interface { type rtspSessionPathManager interface {
OnPublisherAnnounce(req pathPublisherAnnounceReq) pathPublisherAnnounceRes onPublisherAnnounce(req pathPublisherAnnounceReq) pathPublisherAnnounceRes
OnReaderSetupPlay(req pathReaderSetupPlayReq) pathReaderSetupPlayRes onReaderSetupPlay(req pathReaderSetupPlayReq) pathReaderSetupPlayRes
} }
type rtspSessionParent interface { type rtspSessionParent interface {
Log(logger.Level, string, ...interface{}) log(logger.Level, string, ...interface{})
} }
type rtspSession struct { type rtspSession struct {
@@ -73,7 +73,7 @@ func newRTSPSession(
} }
// Close closes a Session. // Close closes a Session.
func (s *rtspSession) Close() { func (s *rtspSession) close() {
s.ss.Close() s.ss.Close()
} }
@@ -97,11 +97,11 @@ func (s *rtspSession) RemoteAddr() net.Addr {
} }
func (s *rtspSession) log(level logger.Level, format string, args ...interface{}) { func (s *rtspSession) log(level logger.Level, format string, args ...interface{}) {
s.parent.Log(level, "[session %s] "+format, append([]interface{}{s.id}, args...)...) s.parent.log(level, "[session %s] "+format, append([]interface{}{s.id}, args...)...)
} }
// OnClose is called by rtspServer. // onClose is called by rtspServer.
func (s *rtspSession) OnClose(err error) { func (s *rtspSession) onClose(err error) {
if s.ss.State() == gortsplib.ServerSessionStateRead { if s.ss.State() == gortsplib.ServerSessionStateRead {
if s.onReadCmd != nil { if s.onReadCmd != nil {
s.onReadCmd.Close() s.onReadCmd.Close()
@@ -112,20 +112,20 @@ func (s *rtspSession) OnClose(err error) {
switch s.ss.State() { switch s.ss.State() {
case gortsplib.ServerSessionStatePreRead, gortsplib.ServerSessionStateRead: case gortsplib.ServerSessionStatePreRead, gortsplib.ServerSessionStateRead:
s.path.OnReaderRemove(pathReaderRemoveReq{Author: s}) s.path.onReaderRemove(pathReaderRemoveReq{Author: s})
s.path = nil s.path = nil
case gortsplib.ServerSessionStatePrePublish, gortsplib.ServerSessionStatePublish: case gortsplib.ServerSessionStatePrePublish, gortsplib.ServerSessionStatePublish:
s.path.OnPublisherRemove(pathPublisherRemoveReq{Author: s}) s.path.onPublisherRemove(pathPublisherRemoveReq{Author: s})
s.path = nil s.path = nil
} }
s.log(logger.Info, "closed (%v)", err) s.log(logger.Info, "closed (%v)", err)
} }
// OnAnnounce is called by rtspServer. // onAnnounce is called by rtspServer.
func (s *rtspSession) OnAnnounce(c *rtspConn, ctx *gortsplib.ServerHandlerOnAnnounceCtx) (*base.Response, error) { func (s *rtspSession) onAnnounce(c *rtspConn, ctx *gortsplib.ServerHandlerOnAnnounceCtx) (*base.Response, error) {
res := s.pathManager.OnPublisherAnnounce(pathPublisherAnnounceReq{ res := s.pathManager.onPublisherAnnounce(pathPublisherAnnounceReq{
Author: s, Author: s,
PathName: ctx.Path, PathName: ctx.Path,
IP: ctx.Conn.NetConn().RemoteAddr().(*net.TCPAddr).IP, IP: ctx.Conn.NetConn().RemoteAddr().(*net.TCPAddr).IP,
@@ -164,8 +164,8 @@ func (s *rtspSession) OnAnnounce(c *rtspConn, ctx *gortsplib.ServerHandlerOnAnno
}, nil }, nil
} }
// OnSetup is called by rtspServer. // onSetup is called by rtspServer.
func (s *rtspSession) OnSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCtx, func (s *rtspSession) onSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCtx,
) (*base.Response, *gortsplib.ServerStream, error) { ) (*base.Response, *gortsplib.ServerStream, error) {
// in case the client is setupping a stream with UDP or UDP-multicast, and these // in case the client is setupping a stream with UDP or UDP-multicast, and these
// transport protocols are disabled, gortsplib already blocks the request. // transport protocols are disabled, gortsplib already blocks the request.
@@ -181,7 +181,7 @@ func (s *rtspSession) OnSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCt
switch s.ss.State() { switch s.ss.State() {
case gortsplib.ServerSessionStateInitial, gortsplib.ServerSessionStatePreRead: // play case gortsplib.ServerSessionStateInitial, gortsplib.ServerSessionStatePreRead: // play
res := s.pathManager.OnReaderSetupPlay(pathReaderSetupPlayReq{ res := s.pathManager.onReaderSetupPlay(pathReaderSetupPlayReq{
Author: s, Author: s,
PathName: ctx.Path, PathName: ctx.Path,
IP: ctx.Conn.NetConn().RemoteAddr().(*net.TCPAddr).IP, IP: ctx.Conn.NetConn().RemoteAddr().(*net.TCPAddr).IP,
@@ -241,12 +241,12 @@ func (s *rtspSession) OnSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCt
} }
} }
// OnPlay is called by rtspServer. // onPlay is called by rtspServer.
func (s *rtspSession) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) { func (s *rtspSession) onPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) {
h := make(base.Header) h := make(base.Header)
if s.ss.State() == gortsplib.ServerSessionStatePreRead { if s.ss.State() == gortsplib.ServerSessionStatePreRead {
s.path.OnReaderPlay(pathReaderPlayReq{Author: s}) s.path.onReaderPlay(pathReaderPlayReq{Author: s})
if s.path.Conf().RunOnRead != "" { if s.path.Conf().RunOnRead != "" {
s.log(logger.Info, "runOnRead command started") s.log(logger.Info, "runOnRead command started")
@@ -268,9 +268,9 @@ func (s *rtspSession) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Respo
}, nil }, nil
} }
// OnRecord is called by rtspServer. // onRecord is called by rtspServer.
func (s *rtspSession) OnRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.Response, error) { func (s *rtspSession) onRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.Response, error) {
res := s.path.OnPublisherRecord(pathPublisherRecordReq{ res := s.path.onPublisherRecord(pathPublisherRecordReq{
Author: s, Author: s,
Tracks: s.announcedTracks, Tracks: s.announcedTracks,
}) })
@@ -291,8 +291,8 @@ func (s *rtspSession) OnRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.R
}, nil }, nil
} }
// OnPause is called by rtspServer. // onPause is called by rtspServer.
func (s *rtspSession) OnPause(ctx *gortsplib.ServerHandlerOnPauseCtx) (*base.Response, error) { func (s *rtspSession) onPause(ctx *gortsplib.ServerHandlerOnPauseCtx) (*base.Response, error) {
switch s.ss.State() { switch s.ss.State() {
case gortsplib.ServerSessionStateRead: case gortsplib.ServerSessionStateRead:
if s.onReadCmd != nil { if s.onReadCmd != nil {
@@ -300,14 +300,14 @@ func (s *rtspSession) OnPause(ctx *gortsplib.ServerHandlerOnPauseCtx) (*base.Res
s.onReadCmd.Close() s.onReadCmd.Close()
} }
s.path.OnReaderPause(pathReaderPauseReq{Author: s}) s.path.onReaderPause(pathReaderPauseReq{Author: s})
s.stateMutex.Lock() s.stateMutex.Lock()
s.state = gortsplib.ServerSessionStatePreRead s.state = gortsplib.ServerSessionStatePreRead
s.stateMutex.Unlock() s.stateMutex.Unlock()
case gortsplib.ServerSessionStatePublish: case gortsplib.ServerSessionStatePublish:
s.path.OnPublisherPause(pathPublisherPauseReq{Author: s}) s.path.onPublisherPause(pathPublisherPauseReq{Author: s})
s.stateMutex.Lock() s.stateMutex.Lock()
s.state = gortsplib.ServerSessionStatePrePublish s.state = gortsplib.ServerSessionStatePrePublish
@@ -319,8 +319,8 @@ func (s *rtspSession) OnPause(ctx *gortsplib.ServerHandlerOnPauseCtx) (*base.Res
}, nil }, nil
} }
// OnReaderAccepted implements reader. // onReaderAccepted implements reader.
func (s *rtspSession) OnReaderAccepted() { func (s *rtspSession) onReaderAccepted() {
tracksLen := len(s.ss.SetuppedTracks()) tracksLen := len(s.ss.SetuppedTracks())
s.log(logger.Info, "is reading from path '%s', %d %s with %s", s.log(logger.Info, "is reading from path '%s', %d %s with %s",
@@ -335,13 +335,13 @@ func (s *rtspSession) OnReaderAccepted() {
s.ss.SetuppedTransport()) s.ss.SetuppedTransport())
} }
// OnReaderFrame implements reader. // onReaderFrame implements reader.
func (s *rtspSession) OnReaderFrame(trackID int, streamType gortsplib.StreamType, payload []byte) { func (s *rtspSession) onReaderFrame(trackID int, streamType gortsplib.StreamType, payload []byte) {
s.ss.WriteFrame(trackID, streamType, payload) s.ss.WriteFrame(trackID, streamType, payload)
} }
// OnReaderAPIDescribe implements reader. // onReaderAPIDescribe implements reader.
func (s *rtspSession) OnReaderAPIDescribe() interface{} { func (s *rtspSession) onReaderAPIDescribe() interface{} {
var typ string var typ string
if s.isTLS { if s.isTLS {
typ = "rtspsSession" typ = "rtspsSession"
@@ -355,8 +355,8 @@ func (s *rtspSession) OnReaderAPIDescribe() interface{} {
}{typ, s.id} }{typ, s.id}
} }
// OnSourceAPIDescribe implements source. // onSourceAPIDescribe implements source.
func (s *rtspSession) OnSourceAPIDescribe() interface{} { func (s *rtspSession) onSourceAPIDescribe() interface{} {
var typ string var typ string
if s.isTLS { if s.isTLS {
typ = "rtspsSession" typ = "rtspsSession"
@@ -370,8 +370,8 @@ func (s *rtspSession) OnSourceAPIDescribe() interface{} {
}{typ, s.id} }{typ, s.id}
} }
// OnPublisherAccepted implements publisher. // onPublisherAccepted implements publisher.
func (s *rtspSession) OnPublisherAccepted(tracksLen int) { func (s *rtspSession) onPublisherAccepted(tracksLen int) {
s.log(logger.Info, "is publishing to path '%s', %d %s with %s", s.log(logger.Info, "is publishing to path '%s', %d %s with %s",
s.path.Name(), s.path.Name(),
tracksLen, tracksLen,
@@ -384,8 +384,8 @@ func (s *rtspSession) OnPublisherAccepted(tracksLen int) {
s.ss.SetuppedTransport()) s.ss.SetuppedTransport())
} }
// OnFrame is called by rtspServer. // onFrame is called by rtspServer.
func (s *rtspSession) OnFrame(ctx *gortsplib.ServerHandlerOnFrameCtx) { func (s *rtspSession) onFrame(ctx *gortsplib.ServerHandlerOnFrameCtx) {
if s.ss.State() != gortsplib.ServerSessionStatePublish { if s.ss.State() != gortsplib.ServerSessionStatePublish {
return return
} }

View File

@@ -22,8 +22,8 @@ const (
) )
type rtspSourceParent interface { type rtspSourceParent interface {
Log(logger.Level, string, ...interface{}) log(logger.Level, string, ...interface{})
OnSourceStaticSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes onSourceStaticSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes
OnSourceStaticSetNotReady(req pathSourceStaticSetNotReadyReq) OnSourceStaticSetNotReady(req pathSourceStaticSetNotReadyReq)
} }
@@ -80,13 +80,13 @@ func newRTSPSource(
return s return s
} }
func (s *rtspSource) Close() { func (s *rtspSource) close() {
s.log(logger.Info, "stopped") s.log(logger.Info, "stopped")
s.ctxCancel() s.ctxCancel()
} }
func (s *rtspSource) log(level logger.Level, format string, args ...interface{}) { func (s *rtspSource) log(level logger.Level, format string, args ...interface{}) {
s.parent.Log(level, "[rtsp source] "+format, args...) s.parent.log(level, "[rtsp source] "+format, args...)
} }
func (s *rtspSource) run() { func (s *rtspSource) run() {
@@ -177,7 +177,7 @@ func (s *rtspSource) runInner() bool {
return true return true
} }
res := s.parent.OnSourceStaticSetReady(pathSourceStaticSetReadyReq{ res := s.parent.onSourceStaticSetReady(pathSourceStaticSetReadyReq{
Source: s, Source: s,
Tracks: conn.Tracks(), Tracks: conn.Tracks(),
}) })
@@ -212,8 +212,8 @@ func (s *rtspSource) runInner() bool {
} }
} }
// OnSourceAPIDescribe implements source. // onSourceAPIDescribe implements source.
func (*rtspSource) OnSourceAPIDescribe() interface{} { func (*rtspSource) onSourceAPIDescribe() interface{} {
return struct { return struct {
Type string `json:"type"` Type string `json:"type"`
}{"rtspSource"} }{"rtspSource"}

View File

@@ -2,11 +2,11 @@ package core
// source is an entity that can provide a stream, statically or dynamically. // source is an entity that can provide a stream, statically or dynamically.
type source interface { type source interface {
OnSourceAPIDescribe() interface{} onSourceAPIDescribe() interface{}
} }
// sourceStatic is an entity that can provide a static stream. // sourceStatic is an entity that can provide a static stream.
type sourceStatic interface { type sourceStatic interface {
source source
Close() close()
} }

View File

@@ -40,7 +40,7 @@ func (m *streamNonRTSPReadersMap) forwardFrame(trackID int, streamType gortsplib
defer m.mutex.RUnlock() defer m.mutex.RUnlock()
for c := range m.ma { for c := range m.ma {
c.OnReaderFrame(trackID, streamType, payload) c.onReaderFrame(trackID, streamType, payload)
} }
} }