mirror of
https://github.com/aler9/rtsp-simple-server
synced 2025-09-30 13:32:11 +08:00

When hlsAlwaysRemux was switched from false to true, through API or hot reloading, muxers of existing paths were not created. This fixes the issue.
992 lines
25 KiB
Go
992 lines
25 KiB
Go
package core
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/bluenviron/gortsplib/v4/pkg/description"
|
|
|
|
"github.com/bluenviron/mediamtx/internal/conf"
|
|
"github.com/bluenviron/mediamtx/internal/defs"
|
|
"github.com/bluenviron/mediamtx/internal/externalcmd"
|
|
"github.com/bluenviron/mediamtx/internal/hooks"
|
|
"github.com/bluenviron/mediamtx/internal/logger"
|
|
"github.com/bluenviron/mediamtx/internal/recorder"
|
|
"github.com/bluenviron/mediamtx/internal/staticsources"
|
|
"github.com/bluenviron/mediamtx/internal/stream"
|
|
)
|
|
|
|
func emptyTimer() *time.Timer {
|
|
t := time.NewTimer(0)
|
|
<-t.C
|
|
return t
|
|
}
|
|
|
|
type pathParent interface {
|
|
logger.Writer
|
|
pathReady(*path)
|
|
pathNotReady(*path)
|
|
closePath(*path)
|
|
AddReader(req defs.PathAddReaderReq) (defs.Path, *stream.Stream, error)
|
|
}
|
|
|
|
type pathOnDemandState int
|
|
|
|
const (
|
|
pathOnDemandStateInitial pathOnDemandState = iota
|
|
pathOnDemandStateWaitingReady
|
|
pathOnDemandStateReady
|
|
pathOnDemandStateClosing
|
|
)
|
|
|
|
type pathAPIPathsListRes struct {
|
|
data *defs.APIPathList
|
|
paths map[string]*path
|
|
}
|
|
|
|
type pathAPIPathsListReq struct {
|
|
res chan pathAPIPathsListRes
|
|
}
|
|
|
|
type pathAPIPathsGetRes struct {
|
|
path *path
|
|
data *defs.APIPath
|
|
err error
|
|
}
|
|
|
|
type pathAPIPathsGetReq struct {
|
|
name string
|
|
res chan pathAPIPathsGetRes
|
|
}
|
|
|
|
type path struct {
|
|
parentCtx context.Context
|
|
logLevel conf.LogLevel
|
|
rtspAddress string
|
|
readTimeout conf.Duration
|
|
writeTimeout conf.Duration
|
|
writeQueueSize int
|
|
udpMaxPayloadSize int
|
|
conf *conf.Path
|
|
name string
|
|
matches []string
|
|
wg *sync.WaitGroup
|
|
externalCmdPool *externalcmd.Pool
|
|
parent pathParent
|
|
|
|
ctx context.Context
|
|
ctxCancel func()
|
|
confMutex sync.RWMutex
|
|
source defs.Source
|
|
publisherQuery string
|
|
stream *stream.Stream
|
|
recorder *recorder.Recorder
|
|
readyTime time.Time
|
|
onUnDemandHook func(string)
|
|
onNotReadyHook func()
|
|
readers map[defs.Reader]struct{}
|
|
describeRequestsOnHold []defs.PathDescribeReq
|
|
readerAddRequestsOnHold []defs.PathAddReaderReq
|
|
onDemandStaticSourceState pathOnDemandState
|
|
onDemandStaticSourceReadyTimer *time.Timer
|
|
onDemandStaticSourceCloseTimer *time.Timer
|
|
onDemandPublisherState pathOnDemandState
|
|
onDemandPublisherReadyTimer *time.Timer
|
|
onDemandPublisherCloseTimer *time.Timer
|
|
|
|
// in
|
|
chReloadConf chan *conf.Path
|
|
chStaticSourceSetReady chan defs.PathSourceStaticSetReadyReq
|
|
chStaticSourceSetNotReady chan defs.PathSourceStaticSetNotReadyReq
|
|
chDescribe chan defs.PathDescribeReq
|
|
chAddPublisher chan defs.PathAddPublisherReq
|
|
chRemovePublisher chan defs.PathRemovePublisherReq
|
|
chStartPublisher chan defs.PathStartPublisherReq
|
|
chStopPublisher chan defs.PathStopPublisherReq
|
|
chAddReader chan defs.PathAddReaderReq
|
|
chRemoveReader chan defs.PathRemoveReaderReq
|
|
chAPIPathsGet chan pathAPIPathsGetReq
|
|
|
|
// out
|
|
done chan struct{}
|
|
}
|
|
|
|
func (pa *path) initialize() {
|
|
ctx, ctxCancel := context.WithCancel(pa.parentCtx)
|
|
|
|
pa.ctx = ctx
|
|
pa.ctxCancel = ctxCancel
|
|
pa.readers = make(map[defs.Reader]struct{})
|
|
pa.onDemandStaticSourceReadyTimer = emptyTimer()
|
|
pa.onDemandStaticSourceCloseTimer = emptyTimer()
|
|
pa.onDemandPublisherReadyTimer = emptyTimer()
|
|
pa.onDemandPublisherCloseTimer = emptyTimer()
|
|
pa.chReloadConf = make(chan *conf.Path)
|
|
pa.chStaticSourceSetReady = make(chan defs.PathSourceStaticSetReadyReq)
|
|
pa.chStaticSourceSetNotReady = make(chan defs.PathSourceStaticSetNotReadyReq)
|
|
pa.chDescribe = make(chan defs.PathDescribeReq)
|
|
pa.chAddPublisher = make(chan defs.PathAddPublisherReq)
|
|
pa.chRemovePublisher = make(chan defs.PathRemovePublisherReq)
|
|
pa.chStartPublisher = make(chan defs.PathStartPublisherReq)
|
|
pa.chStopPublisher = make(chan defs.PathStopPublisherReq)
|
|
pa.chAddReader = make(chan defs.PathAddReaderReq)
|
|
pa.chRemoveReader = make(chan defs.PathRemoveReaderReq)
|
|
pa.chAPIPathsGet = make(chan pathAPIPathsGetReq)
|
|
pa.done = make(chan struct{})
|
|
|
|
pa.Log(logger.Debug, "created")
|
|
|
|
pa.wg.Add(1)
|
|
go pa.run()
|
|
}
|
|
|
|
func (pa *path) close() {
|
|
pa.ctxCancel()
|
|
}
|
|
|
|
func (pa *path) wait() {
|
|
<-pa.done
|
|
}
|
|
|
|
// Log implements logger.Writer.
|
|
func (pa *path) Log(level logger.Level, format string, args ...interface{}) {
|
|
pa.parent.Log(level, "[path "+pa.name+"] "+format, args...)
|
|
}
|
|
|
|
func (pa *path) Name() string {
|
|
return pa.name
|
|
}
|
|
|
|
func (pa *path) isReady() bool {
|
|
return pa.stream != nil
|
|
}
|
|
|
|
func (pa *path) run() {
|
|
defer close(pa.done)
|
|
defer pa.wg.Done()
|
|
|
|
if pa.conf.Source == "redirect" {
|
|
pa.source = &sourceRedirect{}
|
|
} else if pa.conf.HasStaticSource() {
|
|
pa.source = &staticsources.Handler{
|
|
Conf: pa.conf,
|
|
LogLevel: pa.logLevel,
|
|
ReadTimeout: pa.readTimeout,
|
|
WriteTimeout: pa.writeTimeout,
|
|
WriteQueueSize: pa.writeQueueSize,
|
|
Matches: pa.matches,
|
|
PathManager: pa.parent,
|
|
Parent: pa,
|
|
}
|
|
pa.source.(*staticsources.Handler).Initialize()
|
|
|
|
if !pa.conf.SourceOnDemand {
|
|
pa.source.(*staticsources.Handler).Start(false, "")
|
|
}
|
|
}
|
|
|
|
onUnInitHook := hooks.OnInit(hooks.OnInitParams{
|
|
Logger: pa,
|
|
ExternalCmdPool: pa.externalCmdPool,
|
|
Conf: pa.conf,
|
|
ExternalCmdEnv: pa.ExternalCmdEnv(),
|
|
})
|
|
|
|
err := pa.runInner()
|
|
|
|
// call before destroying context
|
|
pa.parent.closePath(pa)
|
|
|
|
pa.ctxCancel()
|
|
|
|
pa.onDemandStaticSourceReadyTimer.Stop()
|
|
pa.onDemandStaticSourceCloseTimer.Stop()
|
|
pa.onDemandPublisherReadyTimer.Stop()
|
|
pa.onDemandPublisherCloseTimer.Stop()
|
|
|
|
onUnInitHook()
|
|
|
|
for _, req := range pa.describeRequestsOnHold {
|
|
req.Res <- defs.PathDescribeRes{Err: fmt.Errorf("terminated")}
|
|
}
|
|
|
|
for _, req := range pa.readerAddRequestsOnHold {
|
|
req.Res <- defs.PathAddReaderRes{Err: fmt.Errorf("terminated")}
|
|
}
|
|
|
|
if pa.stream != nil {
|
|
pa.setNotReady()
|
|
}
|
|
|
|
if pa.source != nil {
|
|
if source, ok := pa.source.(*staticsources.Handler); ok {
|
|
if !pa.conf.SourceOnDemand || pa.onDemandStaticSourceState != pathOnDemandStateInitial {
|
|
source.Close("path is closing")
|
|
}
|
|
} else if source, ok := pa.source.(defs.Publisher); ok {
|
|
source.Close()
|
|
}
|
|
}
|
|
|
|
if pa.onUnDemandHook != nil {
|
|
pa.onUnDemandHook("path destroyed")
|
|
}
|
|
|
|
pa.Log(logger.Debug, "destroyed: %v", err)
|
|
}
|
|
|
|
func (pa *path) runInner() error {
|
|
for {
|
|
select {
|
|
case <-pa.onDemandStaticSourceReadyTimer.C:
|
|
pa.doOnDemandStaticSourceReadyTimer()
|
|
|
|
if pa.shouldClose() {
|
|
return fmt.Errorf("not in use")
|
|
}
|
|
|
|
case <-pa.onDemandStaticSourceCloseTimer.C:
|
|
pa.doOnDemandStaticSourceCloseTimer()
|
|
|
|
if pa.shouldClose() {
|
|
return fmt.Errorf("not in use")
|
|
}
|
|
|
|
case <-pa.onDemandPublisherReadyTimer.C:
|
|
pa.doOnDemandPublisherReadyTimer()
|
|
|
|
if pa.shouldClose() {
|
|
return fmt.Errorf("not in use")
|
|
}
|
|
|
|
case <-pa.onDemandPublisherCloseTimer.C:
|
|
pa.doOnDemandPublisherCloseTimer()
|
|
|
|
case newConf := <-pa.chReloadConf:
|
|
pa.doReloadConf(newConf)
|
|
|
|
case req := <-pa.chStaticSourceSetReady:
|
|
pa.doSourceStaticSetReady(req)
|
|
|
|
case req := <-pa.chStaticSourceSetNotReady:
|
|
pa.doSourceStaticSetNotReady(req)
|
|
|
|
if pa.shouldClose() {
|
|
return fmt.Errorf("not in use")
|
|
}
|
|
|
|
case req := <-pa.chDescribe:
|
|
pa.doDescribe(req)
|
|
|
|
if pa.shouldClose() {
|
|
return fmt.Errorf("not in use")
|
|
}
|
|
|
|
case req := <-pa.chAddPublisher:
|
|
pa.doAddPublisher(req)
|
|
|
|
case req := <-pa.chRemovePublisher:
|
|
pa.doRemovePublisher(req)
|
|
|
|
if pa.shouldClose() {
|
|
return fmt.Errorf("not in use")
|
|
}
|
|
|
|
case req := <-pa.chStartPublisher:
|
|
pa.doStartPublisher(req)
|
|
|
|
case req := <-pa.chStopPublisher:
|
|
pa.doStopPublisher(req)
|
|
|
|
if pa.shouldClose() {
|
|
return fmt.Errorf("not in use")
|
|
}
|
|
|
|
case req := <-pa.chAddReader:
|
|
pa.doAddReader(req)
|
|
|
|
if pa.shouldClose() {
|
|
return fmt.Errorf("not in use")
|
|
}
|
|
|
|
case req := <-pa.chRemoveReader:
|
|
pa.doRemoveReader(req)
|
|
|
|
case req := <-pa.chAPIPathsGet:
|
|
pa.doAPIPathsGet(req)
|
|
|
|
case <-pa.ctx.Done():
|
|
return fmt.Errorf("terminated")
|
|
}
|
|
}
|
|
}
|
|
|
|
func (pa *path) doOnDemandStaticSourceReadyTimer() {
|
|
for _, req := range pa.describeRequestsOnHold {
|
|
req.Res <- defs.PathDescribeRes{Err: fmt.Errorf("source of path '%s' has timed out", pa.name)}
|
|
}
|
|
pa.describeRequestsOnHold = nil
|
|
|
|
for _, req := range pa.readerAddRequestsOnHold {
|
|
req.Res <- defs.PathAddReaderRes{Err: fmt.Errorf("source of path '%s' has timed out", pa.name)}
|
|
}
|
|
pa.readerAddRequestsOnHold = nil
|
|
|
|
pa.onDemandStaticSourceStop("timed out")
|
|
}
|
|
|
|
func (pa *path) doOnDemandStaticSourceCloseTimer() {
|
|
pa.setNotReady()
|
|
pa.onDemandStaticSourceStop("not needed by anyone")
|
|
}
|
|
|
|
func (pa *path) doOnDemandPublisherReadyTimer() {
|
|
for _, req := range pa.describeRequestsOnHold {
|
|
req.Res <- defs.PathDescribeRes{Err: fmt.Errorf("source of path '%s' has timed out", pa.name)}
|
|
}
|
|
pa.describeRequestsOnHold = nil
|
|
|
|
for _, req := range pa.readerAddRequestsOnHold {
|
|
req.Res <- defs.PathAddReaderRes{Err: fmt.Errorf("source of path '%s' has timed out", pa.name)}
|
|
}
|
|
pa.readerAddRequestsOnHold = nil
|
|
|
|
pa.onDemandPublisherStop("timed out")
|
|
}
|
|
|
|
func (pa *path) doOnDemandPublisherCloseTimer() {
|
|
pa.onDemandPublisherStop("not needed by anyone")
|
|
}
|
|
|
|
func (pa *path) doReloadConf(newConf *conf.Path) {
|
|
pa.confMutex.Lock()
|
|
pa.conf = newConf
|
|
pa.confMutex.Unlock()
|
|
|
|
if pa.conf.HasStaticSource() {
|
|
pa.source.(*staticsources.Handler).ReloadConf(newConf)
|
|
}
|
|
|
|
if pa.conf.Record {
|
|
if pa.stream != nil && pa.recorder == nil {
|
|
pa.startRecording()
|
|
}
|
|
} else if pa.recorder != nil {
|
|
pa.recorder.Close()
|
|
pa.recorder = nil
|
|
}
|
|
}
|
|
|
|
func (pa *path) doSourceStaticSetReady(req defs.PathSourceStaticSetReadyReq) {
|
|
err := pa.setReady(req.Desc, req.GenerateRTPPackets)
|
|
if err != nil {
|
|
req.Res <- defs.PathSourceStaticSetReadyRes{Err: err}
|
|
return
|
|
}
|
|
|
|
if pa.conf.HasOnDemandStaticSource() {
|
|
pa.onDemandStaticSourceReadyTimer.Stop()
|
|
pa.onDemandStaticSourceReadyTimer = emptyTimer()
|
|
pa.onDemandStaticSourceScheduleClose()
|
|
}
|
|
|
|
pa.consumeOnHoldRequests()
|
|
|
|
req.Res <- defs.PathSourceStaticSetReadyRes{Stream: pa.stream}
|
|
}
|
|
|
|
func (pa *path) doSourceStaticSetNotReady(req defs.PathSourceStaticSetNotReadyReq) {
|
|
pa.setNotReady()
|
|
|
|
// send response before calling onDemandStaticSourceStop()
|
|
// in order to avoid a deadlock due to staticsources.Handler.stop()
|
|
close(req.Res)
|
|
|
|
if pa.conf.HasOnDemandStaticSource() && pa.onDemandStaticSourceState != pathOnDemandStateInitial {
|
|
pa.onDemandStaticSourceStop("an error occurred")
|
|
}
|
|
}
|
|
|
|
func (pa *path) doDescribe(req defs.PathDescribeReq) {
|
|
if _, ok := pa.source.(*sourceRedirect); ok {
|
|
req.Res <- defs.PathDescribeRes{
|
|
Redirect: pa.conf.SourceRedirect,
|
|
}
|
|
return
|
|
}
|
|
|
|
if pa.stream != nil {
|
|
req.Res <- defs.PathDescribeRes{
|
|
Stream: pa.stream,
|
|
}
|
|
return
|
|
}
|
|
|
|
if pa.conf.HasOnDemandStaticSource() {
|
|
if pa.onDemandStaticSourceState == pathOnDemandStateInitial {
|
|
pa.onDemandStaticSourceStart(req.AccessRequest.Query)
|
|
}
|
|
pa.describeRequestsOnHold = append(pa.describeRequestsOnHold, req)
|
|
return
|
|
}
|
|
|
|
if pa.conf.HasOnDemandPublisher() {
|
|
if pa.onDemandPublisherState == pathOnDemandStateInitial {
|
|
pa.onDemandPublisherStart(req.AccessRequest.Query)
|
|
}
|
|
pa.describeRequestsOnHold = append(pa.describeRequestsOnHold, req)
|
|
return
|
|
}
|
|
|
|
if pa.conf.Fallback != "" {
|
|
req.Res <- defs.PathDescribeRes{Redirect: pa.conf.Fallback}
|
|
return
|
|
}
|
|
|
|
req.Res <- defs.PathDescribeRes{Err: defs.PathNoStreamAvailableError{PathName: pa.name}}
|
|
}
|
|
|
|
func (pa *path) doRemovePublisher(req defs.PathRemovePublisherReq) {
|
|
if pa.source == req.Author {
|
|
pa.executeRemovePublisher()
|
|
}
|
|
close(req.Res)
|
|
}
|
|
|
|
func (pa *path) doAddPublisher(req defs.PathAddPublisherReq) {
|
|
if pa.conf.Source != "publisher" {
|
|
req.Res <- defs.PathAddPublisherRes{
|
|
Err: fmt.Errorf("can't publish to path '%s' since 'source' is not 'publisher'", pa.name),
|
|
}
|
|
return
|
|
}
|
|
|
|
if pa.source != nil {
|
|
if !pa.conf.OverridePublisher {
|
|
req.Res <- defs.PathAddPublisherRes{Err: fmt.Errorf("someone is already publishing to path '%s'", pa.name)}
|
|
return
|
|
}
|
|
|
|
pa.Log(logger.Info, "closing existing publisher")
|
|
pa.source.(defs.Publisher).Close()
|
|
pa.executeRemovePublisher()
|
|
}
|
|
|
|
pa.source = req.Author
|
|
pa.publisherQuery = req.AccessRequest.Query
|
|
|
|
req.Res <- defs.PathAddPublisherRes{Path: pa}
|
|
}
|
|
|
|
func (pa *path) doStartPublisher(req defs.PathStartPublisherReq) {
|
|
if pa.source != req.Author {
|
|
req.Res <- defs.PathStartPublisherRes{Err: fmt.Errorf("publisher is not assigned to this path anymore")}
|
|
return
|
|
}
|
|
|
|
err := pa.setReady(req.Desc, req.GenerateRTPPackets)
|
|
if err != nil {
|
|
req.Res <- defs.PathStartPublisherRes{Err: err}
|
|
return
|
|
}
|
|
|
|
req.Author.Log(logger.Info, "is publishing to path '%s', %s",
|
|
pa.name,
|
|
defs.MediasInfo(req.Desc.Medias))
|
|
|
|
if pa.conf.HasOnDemandPublisher() && pa.onDemandPublisherState != pathOnDemandStateInitial {
|
|
pa.onDemandPublisherReadyTimer.Stop()
|
|
pa.onDemandPublisherReadyTimer = emptyTimer()
|
|
pa.onDemandPublisherScheduleClose()
|
|
}
|
|
|
|
pa.consumeOnHoldRequests()
|
|
|
|
req.Res <- defs.PathStartPublisherRes{Stream: pa.stream}
|
|
}
|
|
|
|
func (pa *path) doStopPublisher(req defs.PathStopPublisherReq) {
|
|
if req.Author == pa.source && pa.stream != nil {
|
|
pa.setNotReady()
|
|
}
|
|
close(req.Res)
|
|
}
|
|
|
|
func (pa *path) doAddReader(req defs.PathAddReaderReq) {
|
|
if pa.stream != nil {
|
|
pa.addReaderPost(req)
|
|
return
|
|
}
|
|
|
|
if pa.conf.HasOnDemandStaticSource() {
|
|
if pa.onDemandStaticSourceState == pathOnDemandStateInitial {
|
|
pa.onDemandStaticSourceStart(req.AccessRequest.Query)
|
|
}
|
|
pa.readerAddRequestsOnHold = append(pa.readerAddRequestsOnHold, req)
|
|
return
|
|
}
|
|
|
|
if pa.conf.HasOnDemandPublisher() {
|
|
if pa.onDemandPublisherState == pathOnDemandStateInitial {
|
|
pa.onDemandPublisherStart(req.AccessRequest.Query)
|
|
}
|
|
pa.readerAddRequestsOnHold = append(pa.readerAddRequestsOnHold, req)
|
|
return
|
|
}
|
|
|
|
req.Res <- defs.PathAddReaderRes{Err: defs.PathNoStreamAvailableError{PathName: pa.name}}
|
|
}
|
|
|
|
func (pa *path) doRemoveReader(req defs.PathRemoveReaderReq) {
|
|
if _, ok := pa.readers[req.Author]; ok {
|
|
pa.executeRemoveReader(req.Author)
|
|
}
|
|
close(req.Res)
|
|
|
|
if len(pa.readers) == 0 {
|
|
if pa.conf.HasOnDemandStaticSource() {
|
|
if pa.onDemandStaticSourceState == pathOnDemandStateReady {
|
|
pa.onDemandStaticSourceScheduleClose()
|
|
}
|
|
} else if pa.conf.HasOnDemandPublisher() {
|
|
if pa.onDemandPublisherState == pathOnDemandStateReady {
|
|
pa.onDemandPublisherScheduleClose()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (pa *path) doAPIPathsGet(req pathAPIPathsGetReq) {
|
|
req.res <- pathAPIPathsGetRes{
|
|
data: &defs.APIPath{
|
|
Name: pa.name,
|
|
ConfName: pa.conf.Name,
|
|
Source: func() *defs.APIPathSourceOrReader {
|
|
if pa.source == nil {
|
|
return nil
|
|
}
|
|
v := pa.source.APISourceDescribe()
|
|
return &v
|
|
}(),
|
|
Ready: pa.isReady(),
|
|
ReadyTime: func() *time.Time {
|
|
if !pa.isReady() {
|
|
return nil
|
|
}
|
|
v := pa.readyTime
|
|
return &v
|
|
}(),
|
|
Tracks: func() []string {
|
|
if !pa.isReady() {
|
|
return []string{}
|
|
}
|
|
return defs.MediasToCodecs(pa.stream.Desc.Medias)
|
|
}(),
|
|
BytesReceived: func() uint64 {
|
|
if !pa.isReady() {
|
|
return 0
|
|
}
|
|
return pa.stream.BytesReceived()
|
|
}(),
|
|
BytesSent: func() uint64 {
|
|
if !pa.isReady() {
|
|
return 0
|
|
}
|
|
return pa.stream.BytesSent()
|
|
}(),
|
|
Readers: func() []defs.APIPathSourceOrReader {
|
|
ret := []defs.APIPathSourceOrReader{}
|
|
for r := range pa.readers {
|
|
ret = append(ret, r.APIReaderDescribe())
|
|
}
|
|
return ret
|
|
}(),
|
|
},
|
|
}
|
|
}
|
|
|
|
func (pa *path) SafeConf() *conf.Path {
|
|
pa.confMutex.RLock()
|
|
defer pa.confMutex.RUnlock()
|
|
return pa.conf
|
|
}
|
|
|
|
func (pa *path) ExternalCmdEnv() externalcmd.Environment {
|
|
_, port, _ := net.SplitHostPort(pa.rtspAddress)
|
|
env := externalcmd.Environment{
|
|
"MTX_PATH": pa.name,
|
|
"RTSP_PATH": pa.name, // deprecated
|
|
"RTSP_PORT": port,
|
|
}
|
|
|
|
if len(pa.matches) > 1 {
|
|
for i, ma := range pa.matches[1:] {
|
|
env["G"+strconv.FormatInt(int64(i+1), 10)] = ma
|
|
}
|
|
}
|
|
|
|
return env
|
|
}
|
|
|
|
func (pa *path) shouldClose() bool {
|
|
return pa.conf.Regexp != nil &&
|
|
pa.source == nil &&
|
|
len(pa.readers) == 0 &&
|
|
len(pa.describeRequestsOnHold) == 0 &&
|
|
len(pa.readerAddRequestsOnHold) == 0
|
|
}
|
|
|
|
func (pa *path) onDemandStaticSourceStart(query string) {
|
|
pa.source.(*staticsources.Handler).Start(true, query)
|
|
|
|
pa.onDemandStaticSourceReadyTimer.Stop()
|
|
pa.onDemandStaticSourceReadyTimer = time.NewTimer(time.Duration(pa.conf.SourceOnDemandStartTimeout))
|
|
|
|
pa.onDemandStaticSourceState = pathOnDemandStateWaitingReady
|
|
}
|
|
|
|
func (pa *path) onDemandStaticSourceScheduleClose() {
|
|
pa.onDemandStaticSourceCloseTimer.Stop()
|
|
pa.onDemandStaticSourceCloseTimer = time.NewTimer(time.Duration(pa.conf.SourceOnDemandCloseAfter))
|
|
|
|
pa.onDemandStaticSourceState = pathOnDemandStateClosing
|
|
}
|
|
|
|
func (pa *path) onDemandStaticSourceStop(reason string) {
|
|
if pa.onDemandStaticSourceState == pathOnDemandStateClosing {
|
|
pa.onDemandStaticSourceCloseTimer.Stop()
|
|
pa.onDemandStaticSourceCloseTimer = emptyTimer()
|
|
}
|
|
|
|
pa.onDemandStaticSourceState = pathOnDemandStateInitial
|
|
|
|
pa.source.(*staticsources.Handler).Stop(reason)
|
|
}
|
|
|
|
func (pa *path) onDemandPublisherStart(query string) {
|
|
pa.onUnDemandHook = hooks.OnDemand(hooks.OnDemandParams{
|
|
Logger: pa,
|
|
ExternalCmdPool: pa.externalCmdPool,
|
|
Conf: pa.conf,
|
|
ExternalCmdEnv: pa.ExternalCmdEnv(),
|
|
Query: query,
|
|
})
|
|
|
|
pa.onDemandPublisherReadyTimer.Stop()
|
|
pa.onDemandPublisherReadyTimer = time.NewTimer(time.Duration(pa.conf.RunOnDemandStartTimeout))
|
|
|
|
pa.onDemandPublisherState = pathOnDemandStateWaitingReady
|
|
}
|
|
|
|
func (pa *path) onDemandPublisherScheduleClose() {
|
|
pa.onDemandPublisherCloseTimer.Stop()
|
|
pa.onDemandPublisherCloseTimer = time.NewTimer(time.Duration(pa.conf.RunOnDemandCloseAfter))
|
|
|
|
pa.onDemandPublisherState = pathOnDemandStateClosing
|
|
}
|
|
|
|
func (pa *path) onDemandPublisherStop(reason string) {
|
|
if pa.onDemandPublisherState == pathOnDemandStateClosing {
|
|
pa.onDemandPublisherCloseTimer.Stop()
|
|
pa.onDemandPublisherCloseTimer = emptyTimer()
|
|
}
|
|
|
|
pa.onUnDemandHook(reason)
|
|
pa.onUnDemandHook = nil
|
|
|
|
pa.onDemandPublisherState = pathOnDemandStateInitial
|
|
}
|
|
|
|
func (pa *path) setReady(desc *description.Session, allocateEncoder bool) error {
|
|
pa.stream = &stream.Stream{
|
|
WriteQueueSize: pa.writeQueueSize,
|
|
UDPMaxPayloadSize: pa.udpMaxPayloadSize,
|
|
Desc: desc,
|
|
GenerateRTPPackets: allocateEncoder,
|
|
Parent: pa.source,
|
|
}
|
|
err := pa.stream.Initialize()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if pa.conf.Record {
|
|
pa.startRecording()
|
|
}
|
|
|
|
pa.readyTime = time.Now()
|
|
|
|
pa.onNotReadyHook = hooks.OnReady(hooks.OnReadyParams{
|
|
Logger: pa,
|
|
ExternalCmdPool: pa.externalCmdPool,
|
|
Conf: pa.conf,
|
|
ExternalCmdEnv: pa.ExternalCmdEnv(),
|
|
Desc: pa.source.APISourceDescribe(),
|
|
Query: pa.publisherQuery,
|
|
})
|
|
|
|
pa.parent.pathReady(pa)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (pa *path) consumeOnHoldRequests() {
|
|
for _, req := range pa.describeRequestsOnHold {
|
|
req.Res <- defs.PathDescribeRes{
|
|
Stream: pa.stream,
|
|
}
|
|
}
|
|
pa.describeRequestsOnHold = nil
|
|
|
|
for _, req := range pa.readerAddRequestsOnHold {
|
|
pa.addReaderPost(req)
|
|
}
|
|
pa.readerAddRequestsOnHold = nil
|
|
}
|
|
|
|
func (pa *path) setNotReady() {
|
|
pa.parent.pathNotReady(pa)
|
|
|
|
for r := range pa.readers {
|
|
pa.executeRemoveReader(r)
|
|
r.Close()
|
|
}
|
|
|
|
pa.onNotReadyHook()
|
|
|
|
if pa.recorder != nil {
|
|
pa.recorder.Close()
|
|
pa.recorder = nil
|
|
}
|
|
|
|
if pa.stream != nil {
|
|
pa.stream.Close()
|
|
pa.stream = nil
|
|
}
|
|
}
|
|
|
|
func (pa *path) startRecording() {
|
|
pa.recorder = &recorder.Recorder{
|
|
PathFormat: pa.conf.RecordPath,
|
|
Format: pa.conf.RecordFormat,
|
|
PartDuration: time.Duration(pa.conf.RecordPartDuration),
|
|
SegmentDuration: time.Duration(pa.conf.RecordSegmentDuration),
|
|
PathName: pa.name,
|
|
Stream: pa.stream,
|
|
OnSegmentCreate: func(segmentPath string) {
|
|
if pa.conf.RunOnRecordSegmentCreate != "" {
|
|
env := pa.ExternalCmdEnv()
|
|
env["MTX_SEGMENT_PATH"] = segmentPath
|
|
|
|
pa.Log(logger.Info, "runOnRecordSegmentCreate command launched")
|
|
externalcmd.NewCmd(
|
|
pa.externalCmdPool,
|
|
pa.conf.RunOnRecordSegmentCreate,
|
|
false,
|
|
env,
|
|
nil)
|
|
}
|
|
},
|
|
OnSegmentComplete: func(segmentPath string, segmentDuration time.Duration) {
|
|
if pa.conf.RunOnRecordSegmentComplete != "" {
|
|
env := pa.ExternalCmdEnv()
|
|
env["MTX_SEGMENT_PATH"] = segmentPath
|
|
env["MTX_SEGMENT_DURATION"] = strconv.FormatFloat(segmentDuration.Seconds(), 'f', -1, 64)
|
|
|
|
pa.Log(logger.Info, "runOnRecordSegmentComplete command launched")
|
|
externalcmd.NewCmd(
|
|
pa.externalCmdPool,
|
|
pa.conf.RunOnRecordSegmentComplete,
|
|
false,
|
|
env,
|
|
nil)
|
|
}
|
|
},
|
|
Parent: pa,
|
|
}
|
|
pa.recorder.Initialize()
|
|
}
|
|
|
|
func (pa *path) executeRemoveReader(r defs.Reader) {
|
|
delete(pa.readers, r)
|
|
}
|
|
|
|
func (pa *path) executeRemovePublisher() {
|
|
if pa.stream != nil {
|
|
pa.setNotReady()
|
|
}
|
|
|
|
pa.source = nil
|
|
}
|
|
|
|
func (pa *path) addReaderPost(req defs.PathAddReaderReq) {
|
|
if _, ok := pa.readers[req.Author]; ok {
|
|
req.Res <- defs.PathAddReaderRes{
|
|
Path: pa,
|
|
Stream: pa.stream,
|
|
}
|
|
return
|
|
}
|
|
|
|
if pa.conf.MaxReaders != 0 && len(pa.readers) >= pa.conf.MaxReaders {
|
|
req.Res <- defs.PathAddReaderRes{Err: fmt.Errorf("maximum reader count reached")}
|
|
return
|
|
}
|
|
|
|
pa.readers[req.Author] = struct{}{}
|
|
|
|
if pa.conf.HasOnDemandStaticSource() {
|
|
if pa.onDemandStaticSourceState == pathOnDemandStateClosing {
|
|
pa.onDemandStaticSourceState = pathOnDemandStateReady
|
|
pa.onDemandStaticSourceCloseTimer.Stop()
|
|
pa.onDemandStaticSourceCloseTimer = emptyTimer()
|
|
}
|
|
} else if pa.conf.HasOnDemandPublisher() {
|
|
if pa.onDemandPublisherState == pathOnDemandStateClosing {
|
|
pa.onDemandPublisherState = pathOnDemandStateReady
|
|
pa.onDemandPublisherCloseTimer.Stop()
|
|
pa.onDemandPublisherCloseTimer = emptyTimer()
|
|
}
|
|
}
|
|
|
|
req.Res <- defs.PathAddReaderRes{
|
|
Path: pa,
|
|
Stream: pa.stream,
|
|
}
|
|
}
|
|
|
|
// reloadConf is called by pathManager.
|
|
func (pa *path) reloadConf(newConf *conf.Path) {
|
|
select {
|
|
case pa.chReloadConf <- newConf:
|
|
case <-pa.ctx.Done():
|
|
}
|
|
}
|
|
|
|
// StaticSourceHandlerSetReady is called by staticsources.Handler.
|
|
func (pa *path) StaticSourceHandlerSetReady(
|
|
ctx context.Context, req defs.PathSourceStaticSetReadyReq,
|
|
) {
|
|
select {
|
|
case pa.chStaticSourceSetReady <- req:
|
|
|
|
case <-pa.ctx.Done():
|
|
req.Res <- defs.PathSourceStaticSetReadyRes{Err: fmt.Errorf("terminated")}
|
|
|
|
// this avoids:
|
|
// - invalid requests sent after the source has been terminated
|
|
// - deadlocks caused by <-done inside stop()
|
|
case <-ctx.Done():
|
|
req.Res <- defs.PathSourceStaticSetReadyRes{Err: fmt.Errorf("terminated")}
|
|
}
|
|
}
|
|
|
|
// StaticSourceHandlerSetNotReady is called by staticsources.Handler.
|
|
func (pa *path) StaticSourceHandlerSetNotReady(
|
|
ctx context.Context, req defs.PathSourceStaticSetNotReadyReq,
|
|
) {
|
|
select {
|
|
case pa.chStaticSourceSetNotReady <- req:
|
|
|
|
case <-pa.ctx.Done():
|
|
close(req.Res)
|
|
|
|
// this avoids:
|
|
// - invalid requests sent after the source has been terminated
|
|
// - deadlocks caused by <-done inside stop()
|
|
case <-ctx.Done():
|
|
close(req.Res)
|
|
}
|
|
}
|
|
|
|
// describe is called by a reader or publisher through pathManager.
|
|
func (pa *path) describe(req defs.PathDescribeReq) defs.PathDescribeRes {
|
|
select {
|
|
case pa.chDescribe <- req:
|
|
return <-req.Res
|
|
case <-pa.ctx.Done():
|
|
return defs.PathDescribeRes{Err: fmt.Errorf("terminated")}
|
|
}
|
|
}
|
|
|
|
// addPublisher is called by a publisher through pathManager.
|
|
func (pa *path) addPublisher(req defs.PathAddPublisherReq) (defs.Path, error) {
|
|
select {
|
|
case pa.chAddPublisher <- req:
|
|
res := <-req.Res
|
|
return res.Path, res.Err
|
|
case <-pa.ctx.Done():
|
|
return nil, fmt.Errorf("terminated")
|
|
}
|
|
}
|
|
|
|
// RemovePublisher is called by a publisher.
|
|
func (pa *path) RemovePublisher(req defs.PathRemovePublisherReq) {
|
|
req.Res = make(chan struct{})
|
|
select {
|
|
case pa.chRemovePublisher <- req:
|
|
<-req.Res
|
|
case <-pa.ctx.Done():
|
|
}
|
|
}
|
|
|
|
// StartPublisher is called by a publisher.
|
|
func (pa *path) StartPublisher(req defs.PathStartPublisherReq) (*stream.Stream, error) {
|
|
req.Res = make(chan defs.PathStartPublisherRes)
|
|
select {
|
|
case pa.chStartPublisher <- req:
|
|
res := <-req.Res
|
|
return res.Stream, res.Err
|
|
case <-pa.ctx.Done():
|
|
return nil, fmt.Errorf("terminated")
|
|
}
|
|
}
|
|
|
|
// StopPublisher is called by a publisher.
|
|
func (pa *path) StopPublisher(req defs.PathStopPublisherReq) {
|
|
req.Res = make(chan struct{})
|
|
select {
|
|
case pa.chStopPublisher <- req:
|
|
<-req.Res
|
|
case <-pa.ctx.Done():
|
|
}
|
|
}
|
|
|
|
// addReader is called by a reader through pathManager.
|
|
func (pa *path) addReader(req defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) {
|
|
select {
|
|
case pa.chAddReader <- req:
|
|
res := <-req.Res
|
|
return res.Path, res.Stream, res.Err
|
|
case <-pa.ctx.Done():
|
|
return nil, nil, fmt.Errorf("terminated")
|
|
}
|
|
}
|
|
|
|
// RemoveReader is called by a reader.
|
|
func (pa *path) RemoveReader(req defs.PathRemoveReaderReq) {
|
|
req.Res = make(chan struct{})
|
|
select {
|
|
case pa.chRemoveReader <- req:
|
|
<-req.Res
|
|
case <-pa.ctx.Done():
|
|
}
|
|
}
|
|
|
|
// APIPathsGet is called by api.
|
|
func (pa *path) APIPathsGet(req pathAPIPathsGetReq) (*defs.APIPath, error) {
|
|
req.res = make(chan pathAPIPathsGetRes)
|
|
select {
|
|
case pa.chAPIPathsGet <- req:
|
|
res := <-req.res
|
|
return res.data, res.err
|
|
|
|
case <-pa.ctx.Done():
|
|
return nil, fmt.Errorf("terminated")
|
|
}
|
|
}
|