Files
core/restream/restream.go
2023-04-25 15:55:32 +02:00

1897 lines
45 KiB
Go

package restream
import (
"context"
"errors"
"fmt"
"path/filepath"
"regexp"
"strconv"
"strings"
"sync"
"time"
"github.com/datarhei/core/v16/ffmpeg"
"github.com/datarhei/core/v16/ffmpeg/parse"
"github.com/datarhei/core/v16/ffmpeg/probe"
"github.com/datarhei/core/v16/ffmpeg/skills"
"github.com/datarhei/core/v16/glob"
"github.com/datarhei/core/v16/io/fs"
"github.com/datarhei/core/v16/log"
"github.com/datarhei/core/v16/net"
"github.com/datarhei/core/v16/net/url"
"github.com/datarhei/core/v16/process"
"github.com/datarhei/core/v16/restream/app"
rfs "github.com/datarhei/core/v16/restream/fs"
"github.com/datarhei/core/v16/restream/replace"
"github.com/datarhei/core/v16/restream/store"
"github.com/Masterminds/semver/v3"
)
// The Restreamer interface
type Restreamer interface {
ID() string // ID of this instance
Name() string // Arbitrary name of this instance
CreatedAt() time.Time // Time of when this instance has been created
Start() // Start all processes that have a "start" order
Stop() // Stop all running process but keep their "start" order
AddProcess(config *app.Config) error // Add a new process
GetProcessIDs(idpattern, refpattern string) []string // Get a list of process IDs based on patterns for ID and reference
DeleteProcess(id string) error // Delete a process
UpdateProcess(id string, config *app.Config) error // Update a process
StartProcess(id string) error // Start a process
StopProcess(id string) error // Stop a process
RestartProcess(id string) error // Restart a process
ReloadProcess(id string) error // Reload a process
GetProcess(id string) (*app.Process, error) // Get a process
GetProcessState(id string) (*app.State, error) // Get the state of a process
GetProcessLog(id string) (*app.Log, error) // Get the logs of a process
SearchProcessLogHistory(idpattern, refpattern, state string, from, to *time.Time) []app.LogHistorySearchResult // Search the log history of all processes
GetPlayout(id, inputid string) (string, error) // Get the URL of the playout API for a process
Probe(id string) app.Probe // Probe a process
ProbeWithTimeout(id string, timeout time.Duration) app.Probe // Probe a process with specific timeout
Skills() skills.Skills // Get the ffmpeg skills
ReloadSkills() error // Reload the ffmpeg skills
SetProcessMetadata(id, key string, data interface{}) error // Set metatdata to a process
GetProcessMetadata(id, key string) (interface{}, error) // Get previously set metadata from a process
SetMetadata(key string, data interface{}) error // Set general metadata
GetMetadata(key string) (interface{}, error) // Get previously set general metadata
}
// Config is the required configuration for a new restreamer instance.
type Config struct {
ID string
Name string
Store store.Store
Filesystems []fs.Filesystem
Replace replace.Replacer
FFmpeg ffmpeg.FFmpeg
MaxProcesses int64
Logger log.Logger
}
type task struct {
valid bool
id string // ID of the task/process
reference string
process *app.Process
config *app.Config
command []string // The actual command parameter for ffmpeg
ffmpeg process.Process
parser parse.Parser
playout map[string]int
logger log.Logger
usesDisk bool // Whether this task uses the disk
metadata map[string]interface{}
}
type restream struct {
id string
name string
createdAt time.Time
store store.Store
ffmpeg ffmpeg.FFmpeg
maxProc int64
nProc int64
fs struct {
list []rfs.Filesystem
stopObserver context.CancelFunc
}
replace replace.Replacer
tasks map[string]*task
logger log.Logger
metadata map[string]interface{}
lock sync.RWMutex
startOnce sync.Once
stopOnce sync.Once
}
// New returns a new instance that implements the Restreamer interface
func New(config Config) (Restreamer, error) {
r := &restream{
id: config.ID,
name: config.Name,
createdAt: time.Now(),
store: config.Store,
replace: config.Replace,
logger: config.Logger,
}
if r.logger == nil {
r.logger = log.New("")
}
if r.store == nil {
dummyfs, _ := fs.NewMemFilesystem(fs.MemConfig{})
s, err := store.NewJSON(store.JSONConfig{
Filesystem: dummyfs,
})
if err != nil {
return nil, err
}
r.store = s
}
if len(config.Filesystems) == 0 {
return nil, fmt.Errorf("at least one filesystem must be provided")
}
for _, fs := range config.Filesystems {
fs := rfs.New(rfs.Config{
FS: fs,
Logger: r.logger.WithComponent("Cleanup"),
})
r.fs.list = append(r.fs.list, fs)
}
if r.replace == nil {
r.replace = replace.New()
}
r.ffmpeg = config.FFmpeg
if r.ffmpeg == nil {
return nil, fmt.Errorf("ffmpeg must be provided")
}
r.maxProc = config.MaxProcesses
if err := r.load(); err != nil {
return nil, fmt.Errorf("failed to load data from DB (%w)", err)
}
r.save()
r.stopOnce.Do(func() {})
return r, nil
}
func (r *restream) Start() {
r.startOnce.Do(func() {
r.lock.Lock()
defer r.lock.Unlock()
for id, t := range r.tasks {
if t.process.Order == "start" {
r.startProcess(id)
}
// The filesystem cleanup rules can be set
r.setCleanup(id, t.config)
}
ctx, cancel := context.WithCancel(context.Background())
r.fs.stopObserver = cancel
for _, fs := range r.fs.list {
fs.Start()
if fs.Type() == "disk" {
go r.observe(ctx, fs, 10*time.Second)
}
}
r.stopOnce = sync.Once{}
})
}
func (r *restream) Stop() {
r.stopOnce.Do(func() {
r.lock.Lock()
defer r.lock.Unlock()
// Stop the currently running processes without
// altering their order such that on a subsequent
// Start() they will get restarted.
for id, t := range r.tasks {
if t.ffmpeg != nil {
t.ffmpeg.Stop(true)
}
r.unsetCleanup(id)
}
r.fs.stopObserver()
// Stop the cleanup jobs
for _, fs := range r.fs.list {
fs.Stop()
}
r.startOnce = sync.Once{}
})
}
func (r *restream) observe(ctx context.Context, fs fs.Filesystem, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
size, limit := fs.Size()
isFull := false
if limit > 0 && size >= limit {
isFull = true
}
if isFull {
// Stop all tasks that write to this filesystem
r.lock.Lock()
for id, t := range r.tasks {
if !t.valid {
continue
}
if !t.usesDisk {
continue
}
if t.process.Order != "start" {
continue
}
r.logger.Warn().Log("Shutting down because filesystem is full")
r.stopProcess(id)
}
r.lock.Unlock()
}
}
}
}
func (r *restream) load() error {
data, err := r.store.Load()
if err != nil {
return err
}
tasks := make(map[string]*task)
skills := r.ffmpeg.Skills()
ffversion := skills.FFmpeg.Version
if v, err := semver.NewVersion(ffversion); err == nil {
// Remove the patch level for the constraint
ffversion = fmt.Sprintf("%d.%d.0", v.Major(), v.Minor())
}
for id, process := range data.Process {
if len(process.Config.FFVersion) == 0 {
process.Config.FFVersion = "^" + ffversion
}
t := &task{
id: id,
reference: process.Reference,
process: process,
config: process.Config.Clone(),
logger: r.logger.WithFields(log.Fields{
"id": process.ID,
"reference": process.Reference,
},
),
}
// Replace all placeholders in the config
resolveStaticPlaceholders(t.config, r.replace)
tasks[id] = t
}
for id, userdata := range data.Metadata.Process {
t, ok := tasks[id]
if !ok {
continue
}
t.metadata = userdata
}
// Now that all tasks are defined and all placeholders are
// replaced, we can resolve references and validate the
// inputs and outputs.
for _, t := range tasks {
// Just warn if the ffmpeg version constraint doesn't match the available ffmpeg version
if c, err := semver.NewConstraint(t.config.FFVersion); err == nil {
if v, err := semver.NewVersion(skills.FFmpeg.Version); err == nil {
if !c.Check(v) {
r.logger.Warn().WithFields(log.Fields{
"id": t.id,
"constraint": t.config.FFVersion,
"version": skills.FFmpeg.Version,
}).WithError(fmt.Errorf("available FFmpeg version doesn't fit constraint; you have to update this process to adjust the constraint")).Log("")
}
} else {
r.logger.Warn().WithField("id", t.id).WithError(err).Log("")
}
} else {
r.logger.Warn().WithField("id", t.id).WithError(err).Log("")
}
err := r.resolveAddresses(tasks, t.config)
if err != nil {
r.logger.Warn().WithField("id", t.id).WithError(err).Log("Ignoring")
continue
}
// Validate config with all placeholders replaced. However, we need to take care
// that the config with the task keeps its dynamic placeholders for process starts.
config := t.config.Clone()
resolveDynamicPlaceholder(config, r.replace)
t.usesDisk, err = validateConfig(config, r.fs.list, r.ffmpeg)
if err != nil {
r.logger.Warn().WithField("id", t.id).WithError(err).Log("Ignoring")
continue
}
err = r.setPlayoutPorts(t)
if err != nil {
r.logger.Warn().WithField("id", t.id).WithError(err).Log("Ignoring")
continue
}
t.command = t.config.CreateCommand()
t.parser = r.ffmpeg.NewProcessParser(t.logger, t.id, t.reference, t.config.LogPatterns)
ffmpeg, err := r.ffmpeg.New(ffmpeg.ProcessConfig{
Reconnect: t.config.Reconnect,
ReconnectDelay: time.Duration(t.config.ReconnectDelay) * time.Second,
StaleTimeout: time.Duration(t.config.StaleTimeout) * time.Second,
Timeout: time.Duration(t.config.Timeout) * time.Second,
Scheduler: t.config.Scheduler,
Args: t.command,
Parser: t.parser,
Logger: t.logger,
OnArgs: r.onArgs(t.config.Clone()),
})
if err != nil {
return err
}
t.ffmpeg = ffmpeg
t.valid = true
}
r.tasks = tasks
r.metadata = data.Metadata.System
return nil
}
func (r *restream) save() {
data := store.NewStoreData()
for id, t := range r.tasks {
data.Process[id] = t.process
data.Metadata.System = r.metadata
data.Metadata.Process[id] = t.metadata
}
r.store.Store(data)
}
func (r *restream) ID() string {
return r.id
}
func (r *restream) Name() string {
return r.name
}
func (r *restream) CreatedAt() time.Time {
return r.createdAt
}
var ErrUnknownProcess = errors.New("unknown process")
var ErrProcessExists = errors.New("process already exists")
func (r *restream) AddProcess(config *app.Config) error {
r.lock.RLock()
t, err := r.createTask(config)
r.lock.RUnlock()
if err != nil {
return err
}
r.lock.Lock()
defer r.lock.Unlock()
_, ok := r.tasks[t.id]
if ok {
return ErrProcessExists
}
r.tasks[t.id] = t
// set filesystem cleanup rules
r.setCleanup(t.id, t.config)
if t.process.Order == "start" {
err := r.startProcess(t.id)
if err != nil {
delete(r.tasks, t.id)
return err
}
}
r.save()
return nil
}
// createTask creates a new task based on a process config.
func (r *restream) createTask(config *app.Config) (*task, error) {
id := strings.TrimSpace(config.ID)
if len(id) == 0 {
return nil, fmt.Errorf("an empty ID is not allowed")
}
config.FFVersion = "^" + r.ffmpeg.Skills().FFmpeg.Version
if v, err := semver.NewVersion(config.FFVersion); err == nil {
// Remove the patch level for the constraint
config.FFVersion = fmt.Sprintf("^%d.%d.0", v.Major(), v.Minor())
}
process := &app.Process{
ID: config.ID,
Reference: config.Reference,
Config: config.Clone(),
Order: "stop",
CreatedAt: time.Now().Unix(),
}
process.UpdatedAt = process.CreatedAt
if config.Autostart {
process.Order = "start"
}
t := &task{
id: config.ID,
reference: process.Reference,
process: process,
config: process.Config.Clone(),
logger: r.logger.WithFields(log.Fields{
"id": process.ID,
"reference": process.Reference,
},
),
}
resolveStaticPlaceholders(t.config, r.replace)
err := r.resolveAddresses(r.tasks, t.config)
if err != nil {
return nil, err
}
{
// Validate config with all placeholders replaced. However, we need to take care
// that the config with the task keeps its dynamic placeholders for process starts.
config := t.config.Clone()
resolveDynamicPlaceholder(config, r.replace)
t.usesDisk, err = validateConfig(config, r.fs.list, r.ffmpeg)
if err != nil {
return nil, err
}
}
err = r.setPlayoutPorts(t)
if err != nil {
return nil, err
}
t.command = t.config.CreateCommand()
t.parser = r.ffmpeg.NewProcessParser(t.logger, t.id, t.reference, t.config.LogPatterns)
ffmpeg, err := r.ffmpeg.New(ffmpeg.ProcessConfig{
Reconnect: t.config.Reconnect,
ReconnectDelay: time.Duration(t.config.ReconnectDelay) * time.Second,
StaleTimeout: time.Duration(t.config.StaleTimeout) * time.Second,
Timeout: time.Duration(t.config.Timeout) * time.Second,
Scheduler: t.config.Scheduler,
Args: t.command,
Parser: t.parser,
Logger: t.logger,
OnArgs: r.onArgs(t.config.Clone()),
})
if err != nil {
return nil, err
}
t.ffmpeg = ffmpeg
t.valid = true
return t, nil
}
// onArgs is a callback that gets called by a process before it will be started.
// It evalutes the dynamic placeholders in a process config and returns the
// resulting command line to the process.
func (r *restream) onArgs(cfg *app.Config) func([]string) []string {
return func(args []string) []string {
config := cfg.Clone()
resolveDynamicPlaceholder(config, r.replace)
_, err := validateConfig(config, r.fs.list, r.ffmpeg)
if err != nil {
return []string{}
}
return config.CreateCommand()
}
}
func (r *restream) setCleanup(id string, config *app.Config) {
rePrefix := regexp.MustCompile(`^(.+):`)
for _, output := range config.Output {
for _, c := range output.Cleanup {
matches := rePrefix.FindStringSubmatch(c.Pattern)
if matches == nil {
continue
}
name := matches[1]
// Support legacy names
if name == "diskfs" {
name = "disk"
} else if name == "memfs" {
name = "mem"
}
for _, fs := range r.fs.list {
if fs.Name() != name {
continue
}
pattern := rfs.Pattern{
Pattern: rePrefix.ReplaceAllString(c.Pattern, ""),
MaxFiles: c.MaxFiles,
MaxFileAge: time.Duration(c.MaxFileAge) * time.Second,
PurgeOnDelete: c.PurgeOnDelete,
}
fs.SetCleanup(id, []rfs.Pattern{
pattern,
})
break
}
}
}
}
func (r *restream) unsetCleanup(id string) {
for _, fs := range r.fs.list {
fs.UnsetCleanup(id)
}
}
func (r *restream) setPlayoutPorts(t *task) error {
r.unsetPlayoutPorts(t)
t.playout = make(map[string]int)
for i, input := range t.config.Input {
if !strings.HasPrefix(input.Address, "avstream:") && !strings.HasPrefix(input.Address, "playout:") {
continue
}
options := []string{}
skip := false
for _, o := range input.Options {
if skip {
continue
}
if o == "-playout_httpport" {
skip = true
continue
}
options = append(options, o)
}
if port, err := r.ffmpeg.GetPort(); err == nil {
options = append(options, "-playout_httpport", strconv.Itoa(port))
t.logger.WithFields(log.Fields{
"port": port,
"input": input.ID,
}).Debug().Log("Assinging playout port")
t.playout[input.ID] = port
} else if err != net.ErrNoPortrangerProvided {
return err
}
input.Options = options
t.config.Input[i] = input
}
return nil
}
func (r *restream) unsetPlayoutPorts(t *task) {
if t.playout == nil {
return
}
for _, port := range t.playout {
r.ffmpeg.PutPort(port)
}
t.playout = nil
}
// validateConfig verifies a process config, whether the accessed files (read and write) can be accessed
// based on the provided filesystems and the ffmpeg validators. Returns an error if somethingis wrong,
// otherwise nil and whether there is a disk filesystem involved.
func validateConfig(config *app.Config, fss []rfs.Filesystem, ffmpeg ffmpeg.FFmpeg) (bool, error) {
if len(config.Input) == 0 {
return false, fmt.Errorf("at least one input must be defined for the process '%s'", config.ID)
}
var err error
ids := map[string]bool{}
for _, io := range config.Input {
io.ID = strings.TrimSpace(io.ID)
if len(io.ID) == 0 {
return false, fmt.Errorf("empty input IDs are not allowed (process '%s')", config.ID)
}
if _, found := ids[io.ID]; found {
return false, fmt.Errorf("the input ID '%s' is already in use for the process `%s`", io.ID, config.ID)
}
ids[io.ID] = true
io.Address = strings.TrimSpace(io.Address)
if len(io.Address) == 0 {
return false, fmt.Errorf("the address for input '#%s:%s' must not be empty", config.ID, io.ID)
}
maxFails := 0
for _, fs := range fss {
basedir := "/"
if fs.Type() == "disk" {
basedir = fs.Metadata("base")
}
io.Address, err = validateInputAddress(io.Address, basedir, ffmpeg)
if err != nil {
maxFails++
}
}
if maxFails == len(fss) {
return false, fmt.Errorf("the address for input '#%s:%s' (%s) is invalid: %w", config.ID, io.ID, io.Address, err)
}
}
if len(config.Output) == 0 {
return false, fmt.Errorf("at least one output must be defined for the process '#%s'", config.ID)
}
ids = map[string]bool{}
hasFiles := false
for _, io := range config.Output {
io.ID = strings.TrimSpace(io.ID)
if len(io.ID) == 0 {
return false, fmt.Errorf("empty output IDs are not allowed (process '%s')", config.ID)
}
if _, found := ids[io.ID]; found {
return false, fmt.Errorf("the output ID '%s' is already in use for the process `%s`", io.ID, config.ID)
}
ids[io.ID] = true
io.Address = strings.TrimSpace(io.Address)
if len(io.Address) == 0 {
return false, fmt.Errorf("the address for output '#%s:%s' must not be empty", config.ID, io.ID)
}
maxFails := 0
for _, fs := range fss {
basedir := "/"
if fs.Type() == "disk" {
basedir = fs.Metadata("base")
}
isFile := false
io.Address, isFile, err = validateOutputAddress(io.Address, basedir, ffmpeg)
if err != nil {
maxFails++
}
if isFile {
if fs.Type() == "disk" {
hasFiles = true
}
dir := filepath.Dir(strings.TrimPrefix(io.Address, "file:"+basedir))
fs.MkdirAll(dir, 0744)
}
}
if maxFails == len(fss) {
return false, fmt.Errorf("the address for output '#%s:%s' is invalid: %w", config.ID, io.ID, err)
}
}
return hasFiles, nil
}
// validateInputAddress checks whether the given input address is valid and is allowed to be used.
func validateInputAddress(address, basedir string, ffmpeg ffmpeg.FFmpeg) (string, error) {
if ok := url.HasScheme(address); ok {
if err := url.Validate(address); err != nil {
return address, err
}
}
if !ffmpeg.ValidateInputAddress(address) {
return address, fmt.Errorf("address is not allowed")
}
return address, nil
}
// validateOutputAddress checks whether the given output address is valid and is allowed to be used.
func validateOutputAddress(address, basedir string, ffmpeg ffmpeg.FFmpeg) (string, bool, error) {
// If the address contains a "|" or it starts with a "[", then assume that it
// is an address for the tee muxer.
if strings.Contains(address, "|") || strings.HasPrefix(address, "[") {
addresses := strings.Split(address, "|")
isFile := false
teeOptions := regexp.MustCompile(`^\[[^\]]*\]`)
for i, a := range addresses {
options := teeOptions.FindString(a)
a = teeOptions.ReplaceAllString(a, "")
va, file, err := validateOutputAddress(a, basedir, ffmpeg)
if err != nil {
return address, false, err
}
if file {
isFile = true
}
addresses[i] = options + va
}
return strings.Join(addresses, "|"), isFile, nil
}
address = strings.TrimPrefix(address, "file:")
if ok := url.HasScheme(address); ok {
if err := url.Validate(address); err != nil {
return address, false, err
}
if !ffmpeg.ValidateOutputAddress(address) {
return address, false, fmt.Errorf("address is not allowed")
}
return address, false, nil
}
if address == "-" {
return "pipe:", false, nil
}
address = filepath.Clean(address)
if !filepath.IsAbs(address) {
address = filepath.Join(basedir, address)
}
if strings.HasPrefix(address, "/dev/") {
if !ffmpeg.ValidateOutputAddress("file:" + address) {
return address, false, fmt.Errorf("address is not allowed")
}
return "file:" + address, false, nil
}
if !strings.HasPrefix(address, basedir) {
return address, false, fmt.Errorf("%s is not inside of %s", address, basedir)
}
if !ffmpeg.ValidateOutputAddress("file:" + address) {
return address, false, fmt.Errorf("address is not allowed")
}
return "file:" + address, true, nil
}
// resolveAddresses replaces the addresse reference from each input in a config with the actual address.
func (r *restream) resolveAddresses(tasks map[string]*task, config *app.Config) error {
for i, input := range config.Input {
// Resolve any references
address, err := r.resolveAddress(tasks, config.ID, input.Address)
if err != nil {
return fmt.Errorf("reference error for '#%s:%s': %w", config.ID, input.ID, err)
}
input.Address = address
config.Input[i] = input
}
return nil
}
// resolveAddress replaces the address reference with the actual address.
func (r *restream) resolveAddress(tasks map[string]*task, id, address string) (string, error) {
re := regexp.MustCompile(`^#(.+):output=(.+)`)
if len(address) == 0 {
return address, fmt.Errorf("empty address")
}
if address[0] != '#' {
return address, nil
}
matches := re.FindStringSubmatch(address)
if matches == nil {
return address, fmt.Errorf("invalid format (%s)", address)
}
if matches[1] == id {
return address, fmt.Errorf("self-reference not possible (%s)", address)
}
task, ok := tasks[matches[1]]
if !ok {
return address, fmt.Errorf("unknown process '%s' (%s)", matches[1], address)
}
for _, x := range task.config.Output {
if x.ID == matches[2] {
return x.Address, nil
}
}
return address, fmt.Errorf("the process '%s' has no outputs with the ID '%s' (%s)", matches[1], matches[2], address)
}
func (r *restream) UpdateProcess(id string, config *app.Config) error {
r.lock.Lock()
defer r.lock.Unlock()
t, err := r.createTask(config)
if err != nil {
return err
}
task, ok := r.tasks[id]
if !ok {
return ErrUnknownProcess
}
// This would require a major version jump
//t.process.CreatedAt = task.process.CreatedAt
t.process.UpdatedAt = time.Now().Unix()
task.parser.TransferReportHistory(t.parser)
t.process.Order = task.process.Order
if id != t.id {
_, ok := r.tasks[t.id]
if ok {
return ErrProcessExists
}
}
if err := r.stopProcess(id); err != nil {
return err
}
if err := r.deleteProcess(id); err != nil {
return err
}
r.tasks[t.id] = t
// set filesystem cleanup rules
r.setCleanup(t.id, t.config)
if t.process.Order == "start" {
r.startProcess(t.id)
}
r.save()
return nil
}
func (r *restream) GetProcessIDs(idpattern, refpattern string) []string {
r.lock.RLock()
defer r.lock.RUnlock()
if len(idpattern) == 0 && len(refpattern) == 0 {
ids := make([]string, len(r.tasks))
i := 0
for id := range r.tasks {
ids[i] = id
i++
}
return ids
}
idmap := map[string]int{}
count := 0
if len(idpattern) != 0 {
for id := range r.tasks {
match, err := glob.Match(idpattern, id)
if err != nil {
return nil
}
if !match {
continue
}
idmap[id]++
}
count++
}
if len(refpattern) != 0 {
for _, t := range r.tasks {
match, err := glob.Match(refpattern, t.reference)
if err != nil {
return nil
}
if !match {
continue
}
idmap[t.id]++
}
count++
}
ids := []string{}
for id, n := range idmap {
if n != count {
continue
}
ids = append(ids, id)
}
return ids
}
func (r *restream) GetProcess(id string) (*app.Process, error) {
r.lock.RLock()
defer r.lock.RUnlock()
task, ok := r.tasks[id]
if !ok {
return &app.Process{}, ErrUnknownProcess
}
process := task.process.Clone()
return process, nil
}
func (r *restream) DeleteProcess(id string) error {
r.lock.Lock()
defer r.lock.Unlock()
err := r.deleteProcess(id)
if err != nil {
return err
}
r.save()
return nil
}
func (r *restream) deleteProcess(id string) error {
task, ok := r.tasks[id]
if !ok {
return ErrUnknownProcess
}
if task.process.Order != "stop" {
return fmt.Errorf("the process with the ID '%s' is still running", id)
}
r.unsetPlayoutPorts(task)
r.unsetCleanup(id)
delete(r.tasks, id)
return nil
}
func (r *restream) StartProcess(id string) error {
r.lock.Lock()
defer r.lock.Unlock()
err := r.startProcess(id)
if err != nil {
return err
}
r.save()
return nil
}
func (r *restream) startProcess(id string) error {
task, ok := r.tasks[id]
if !ok {
return ErrUnknownProcess
}
if !task.valid {
return fmt.Errorf("invalid process definition")
}
if task.ffmpeg != nil {
status := task.ffmpeg.Status()
if task.process.Order == "start" && status.Order == "start" {
return nil
}
}
if r.maxProc > 0 && r.nProc >= r.maxProc {
return fmt.Errorf("max. number of running processes (%d) reached", r.maxProc)
}
task.process.Order = "start"
task.ffmpeg.Start()
r.nProc++
return nil
}
func (r *restream) StopProcess(id string) error {
r.lock.Lock()
defer r.lock.Unlock()
err := r.stopProcess(id)
if err != nil {
return err
}
r.save()
return nil
}
func (r *restream) stopProcess(id string) error {
task, ok := r.tasks[id]
if !ok {
return ErrUnknownProcess
}
if task.ffmpeg == nil {
return nil
}
status := task.ffmpeg.Status()
if task.process.Order == "stop" && status.Order == "stop" {
return nil
}
task.process.Order = "stop"
task.ffmpeg.Stop(true)
r.nProc--
return nil
}
func (r *restream) RestartProcess(id string) error {
r.lock.RLock()
defer r.lock.RUnlock()
return r.restartProcess(id)
}
func (r *restream) restartProcess(id string) error {
task, ok := r.tasks[id]
if !ok {
return ErrUnknownProcess
}
if !task.valid {
return fmt.Errorf("invalid process definition")
}
if task.process.Order == "stop" {
return nil
}
if task.ffmpeg != nil {
task.ffmpeg.Kill(true)
}
return nil
}
func (r *restream) ReloadProcess(id string) error {
r.lock.Lock()
defer r.lock.Unlock()
err := r.reloadProcess(id)
if err != nil {
return err
}
r.save()
return nil
}
func (r *restream) reloadProcess(id string) error {
t, ok := r.tasks[id]
if !ok {
return ErrUnknownProcess
}
t.valid = false
t.config = t.process.Config.Clone()
resolveStaticPlaceholders(t.config, r.replace)
err := r.resolveAddresses(r.tasks, t.config)
if err != nil {
return err
}
// Validate config with all placeholders replaced. However, we need to take care
// that the config with the task keeps its dynamic placeholders for process starts.
config := t.config.Clone()
resolveDynamicPlaceholder(config, r.replace)
t.usesDisk, err = validateConfig(config, r.fs.list, r.ffmpeg)
if err != nil {
return err
}
err = r.setPlayoutPorts(t)
if err != nil {
return err
}
t.command = t.config.CreateCommand()
order := "stop"
if t.process.Order == "start" {
order = "start"
r.stopProcess(id)
}
t.parser = r.ffmpeg.NewProcessParser(t.logger, t.id, t.reference, t.config.LogPatterns)
ffmpeg, err := r.ffmpeg.New(ffmpeg.ProcessConfig{
Reconnect: t.config.Reconnect,
ReconnectDelay: time.Duration(t.config.ReconnectDelay) * time.Second,
StaleTimeout: time.Duration(t.config.StaleTimeout) * time.Second,
Timeout: time.Duration(t.config.Timeout) * time.Second,
Scheduler: t.config.Scheduler,
Args: t.command,
Parser: t.parser,
Logger: t.logger,
OnArgs: r.onArgs(t.config.Clone()),
})
if err != nil {
return err
}
t.ffmpeg = ffmpeg
t.valid = true
if order == "start" {
r.startProcess(id)
}
return nil
}
func (r *restream) GetProcessState(id string) (*app.State, error) {
state := &app.State{}
r.lock.RLock()
defer r.lock.RUnlock()
task, ok := r.tasks[id]
if !ok {
return state, ErrUnknownProcess
}
if !task.valid {
return state, nil
}
status := task.ffmpeg.Status()
state.Order = task.process.Order
state.State = status.State
state.States.Marshal(status.States)
state.Time = status.Time.Unix()
state.Memory = status.Memory.Current
state.CPU = status.CPU.Current
state.Duration = status.Duration.Round(10 * time.Millisecond).Seconds()
state.Reconnect = -1
state.Command = status.CommandArgs
state.LastLog = task.parser.LastLogline()
if status.Reconnect >= time.Duration(0) {
state.Reconnect = status.Reconnect.Round(10 * time.Millisecond).Seconds()
}
convertProgressFromParser(&state.Progress, task.parser.Progress())
for i, p := range state.Progress.Input {
if int(p.Index) >= len(task.process.Config.Input) {
continue
}
state.Progress.Input[i].ID = task.process.Config.Input[p.Index].ID
}
for i, p := range state.Progress.Output {
if int(p.Index) >= len(task.process.Config.Output) {
continue
}
state.Progress.Output[i].ID = task.process.Config.Output[p.Index].ID
}
return state, nil
}
// convertProgressFromParser converts a ffmpeg/parse.Progress type into a restream/app.Progress type.
func convertProgressFromParser(progress *app.Progress, pprogress parse.Progress) {
progress.Frame = pprogress.Frame
progress.Packet = pprogress.Packet
progress.FPS = pprogress.FPS
progress.PPS = pprogress.PPS
progress.Quantizer = pprogress.Quantizer
progress.Size = pprogress.Size
progress.Time = pprogress.Time
progress.Bitrate = pprogress.Bitrate
progress.Speed = pprogress.Speed
progress.Drop = pprogress.Drop
progress.Dup = pprogress.Dup
for _, pinput := range pprogress.Input {
input := app.ProgressIO{
Address: pinput.Address,
Index: pinput.Index,
Stream: pinput.Stream,
Format: pinput.Format,
Type: pinput.Type,
Codec: pinput.Codec,
Coder: pinput.Coder,
Frame: pinput.Frame,
Keyframe: pinput.Keyframe,
Framerate: pinput.Framerate,
FPS: pinput.FPS,
Packet: pinput.Packet,
PPS: pinput.PPS,
Size: pinput.Size,
Bitrate: pinput.Bitrate,
Extradata: pinput.Extradata,
Pixfmt: pinput.Pixfmt,
Quantizer: pinput.Quantizer,
Width: pinput.Width,
Height: pinput.Height,
Sampling: pinput.Sampling,
Layout: pinput.Layout,
Channels: pinput.Channels,
AVstream: nil,
}
if pinput.AVstream != nil {
avstream := &app.AVstream{
Input: app.AVstreamIO{
State: pinput.AVstream.Input.State,
Packet: pinput.AVstream.Input.Packet,
Time: pinput.AVstream.Input.Time,
Size: pinput.AVstream.Input.Size,
},
Output: app.AVstreamIO{
State: pinput.AVstream.Output.State,
Packet: pinput.AVstream.Output.Packet,
Time: pinput.AVstream.Output.Time,
Size: pinput.AVstream.Output.Size,
},
Aqueue: pinput.AVstream.Aqueue,
Queue: pinput.AVstream.Queue,
Dup: pinput.AVstream.Dup,
Drop: pinput.AVstream.Drop,
Enc: pinput.AVstream.Enc,
Looping: pinput.AVstream.Looping,
Duplicating: pinput.AVstream.Duplicating,
GOP: pinput.AVstream.GOP,
}
input.AVstream = avstream
}
progress.Input = append(progress.Input, input)
}
for _, poutput := range pprogress.Output {
output := app.ProgressIO{
Address: poutput.Address,
Index: poutput.Index,
Stream: poutput.Stream,
Format: poutput.Format,
Type: poutput.Type,
Codec: poutput.Codec,
Coder: poutput.Coder,
Frame: poutput.Frame,
Keyframe: poutput.Keyframe,
Framerate: poutput.Framerate,
FPS: poutput.FPS,
Packet: poutput.Packet,
PPS: poutput.PPS,
Size: poutput.Size,
Bitrate: poutput.Bitrate,
Extradata: poutput.Extradata,
Pixfmt: poutput.Pixfmt,
Quantizer: poutput.Quantizer,
Width: poutput.Width,
Height: poutput.Height,
Sampling: poutput.Sampling,
Layout: poutput.Layout,
Channels: poutput.Channels,
AVstream: nil,
}
progress.Output = append(progress.Output, output)
}
}
func (r *restream) GetProcessLog(id string) (*app.Log, error) {
r.lock.RLock()
defer r.lock.RUnlock()
task, ok := r.tasks[id]
if !ok {
return &app.Log{}, ErrUnknownProcess
}
if !task.valid {
return &app.Log{}, nil
}
log := &app.Log{}
current := task.parser.Report()
log.CreatedAt = current.CreatedAt
log.Prelude = current.Prelude
log.Log = make([]app.LogLine, len(current.Log))
for i, line := range current.Log {
log.Log[i] = app.LogLine{
Timestamp: line.Timestamp,
Data: line.Data,
}
}
log.Matches = current.Matches
history := task.parser.ReportHistory()
for _, h := range history {
e := app.LogHistoryEntry{
LogEntry: app.LogEntry{
CreatedAt: h.CreatedAt,
Prelude: h.Prelude,
Matches: h.Matches,
},
ExitedAt: h.ExitedAt,
ExitState: h.ExitState,
Usage: app.ProcessUsage{
CPU: app.ProcessUsageCPU{
Average: h.Usage.CPU.Average,
Max: h.Usage.CPU.Max,
Limit: h.Usage.CPU.Limit,
},
Memory: app.ProcessUsageMemory{
Average: h.Usage.Memory.Average,
Max: h.Usage.Memory.Max,
Limit: h.Usage.Memory.Limit,
},
},
}
convertProgressFromParser(&e.Progress, h.Progress)
for i, p := range e.Progress.Input {
if int(p.Index) >= len(task.process.Config.Input) {
continue
}
e.Progress.Input[i].ID = task.process.Config.Input[p.Index].ID
}
for i, p := range e.Progress.Output {
if int(p.Index) >= len(task.process.Config.Output) {
continue
}
e.Progress.Output[i].ID = task.process.Config.Output[p.Index].ID
}
e.LogEntry.Log = make([]app.LogLine, len(h.Log))
for i, line := range h.Log {
e.LogEntry.Log[i] = app.LogLine{
Timestamp: line.Timestamp,
Data: line.Data,
}
}
log.History = append(log.History, e)
}
return log, nil
}
func (r *restream) SearchProcessLogHistory(idpattern, refpattern, state string, from, to *time.Time) []app.LogHistorySearchResult {
r.lock.RLock()
defer r.lock.RUnlock()
result := []app.LogHistorySearchResult{}
ids := r.GetProcessIDs(idpattern, refpattern)
for _, id := range ids {
task, ok := r.tasks[id]
if !ok {
continue
}
presult := task.parser.SearchReportHistory(state, from, to)
for _, f := range presult {
result = append(result, app.LogHistorySearchResult{
ProcessID: task.id,
Reference: task.reference,
ExitState: f.ExitState,
CreatedAt: f.CreatedAt,
ExitedAt: f.ExitedAt,
})
}
}
return result
}
func (r *restream) Probe(id string) app.Probe {
return r.ProbeWithTimeout(id, 20*time.Second)
}
func (r *restream) ProbeWithTimeout(id string, timeout time.Duration) app.Probe {
r.lock.RLock()
appprobe := app.Probe{}
task, ok := r.tasks[id]
if !ok {
appprobe.Log = append(appprobe.Log, fmt.Sprintf("Unknown process ID (%s)", id))
r.lock.RUnlock()
return appprobe
}
r.lock.RUnlock()
if !task.valid {
return appprobe
}
var command []string
// Copy global options
command = append(command, task.config.Options...)
for _, input := range task.config.Input {
// Add the resolved input to the process command
command = append(command, input.Options...)
command = append(command, "-i", input.Address)
}
prober := r.ffmpeg.NewProbeParser(task.logger)
var wg sync.WaitGroup
wg.Add(1)
ffmpeg, err := r.ffmpeg.New(ffmpeg.ProcessConfig{
Reconnect: false,
ReconnectDelay: 0,
StaleTimeout: timeout,
Args: command,
Parser: prober,
Logger: task.logger,
OnExit: func(string) {
wg.Done()
},
})
if err != nil {
appprobe.Log = append(appprobe.Log, err.Error())
return appprobe
}
ffmpeg.Start()
wg.Wait()
convertProbeFromProber(&appprobe, prober.Probe())
return appprobe
}
// convertProbeFromProber converts a ffmpeg/probe.Probe type into an restream/app.Probe type.
func convertProbeFromProber(appprobe *app.Probe, pprobe probe.Probe) {
appprobe.Log = make([]string, len(pprobe.Log))
copy(appprobe.Log, pprobe.Log)
for _, s := range pprobe.Streams {
stream := app.ProbeIO{
Address: s.Address,
Index: s.Index,
Stream: s.Stream,
Language: s.Language,
Format: s.Format,
Type: s.Type,
Codec: s.Codec,
Coder: s.Coder,
Bitrate: s.Bitrate,
Duration: s.Duration,
Pixfmt: s.Pixfmt,
Width: s.Width,
Height: s.Height,
FPS: s.FPS,
Sampling: s.Sampling,
Layout: s.Layout,
Channels: s.Channels,
}
appprobe.Streams = append(appprobe.Streams, stream)
}
}
func (r *restream) Skills() skills.Skills {
return r.ffmpeg.Skills()
}
func (r *restream) ReloadSkills() error {
return r.ffmpeg.ReloadSkills()
}
func (r *restream) GetPlayout(id, inputid string) (string, error) {
r.lock.RLock()
defer r.lock.RUnlock()
task, ok := r.tasks[id]
if !ok {
return "", ErrUnknownProcess
}
if !task.valid {
return "", fmt.Errorf("invalid process definition")
}
port, ok := task.playout[inputid]
if !ok {
return "", fmt.Errorf("no playout for input ID '%s' and process '%s'", inputid, id)
}
return "127.0.0.1:" + strconv.Itoa(port), nil
}
var ErrMetadataKeyNotFound = errors.New("unknown key")
func (r *restream) SetProcessMetadata(id, key string, data interface{}) error {
r.lock.Lock()
defer r.lock.Unlock()
if len(key) == 0 {
return fmt.Errorf("a key for storing the data has to be provided")
}
task, ok := r.tasks[id]
if !ok {
return ErrUnknownProcess
}
if task.metadata == nil {
task.metadata = make(map[string]interface{})
}
if data == nil {
delete(task.metadata, key)
} else {
task.metadata[key] = data
}
if len(task.metadata) == 0 {
task.metadata = nil
}
r.save()
return nil
}
func (r *restream) GetProcessMetadata(id, key string) (interface{}, error) {
r.lock.RLock()
defer r.lock.RUnlock()
task, ok := r.tasks[id]
if !ok {
return nil, ErrUnknownProcess
}
if len(key) == 0 {
return task.metadata, nil
}
data, ok := task.metadata[key]
if !ok {
return nil, ErrMetadataKeyNotFound
}
return data, nil
}
func (r *restream) SetMetadata(key string, data interface{}) error {
r.lock.Lock()
defer r.lock.Unlock()
if len(key) == 0 {
return fmt.Errorf("a key for storing the data has to be provided")
}
if r.metadata == nil {
r.metadata = make(map[string]interface{})
}
if data == nil {
delete(r.metadata, key)
} else {
r.metadata[key] = data
}
if len(r.metadata) == 0 {
r.metadata = nil
}
r.save()
return nil
}
func (r *restream) GetMetadata(key string) (interface{}, error) {
r.lock.RLock()
defer r.lock.RUnlock()
if len(key) == 0 {
if len(r.metadata) == 0 {
return nil, nil
}
return r.metadata, nil
}
data, ok := r.metadata[key]
if !ok {
return nil, ErrMetadataKeyNotFound
}
return data, nil
}
// resolveStaticPlaceholders replaces all placeholders in the config. The config will be modified in place.
func resolveStaticPlaceholders(config *app.Config, r replace.Replacer) {
vars := map[string]string{
"processid": config.ID,
"reference": config.Reference,
}
for i, option := range config.Options {
// Replace any known placeholders
option = r.Replace(option, "diskfs", "", vars, config, "global")
option = r.Replace(option, "fs:*", "", vars, config, "global")
config.Options[i] = option
}
// Resolving the given inputs
for i, input := range config.Input {
// Replace any known placeholders
input.ID = r.Replace(input.ID, "processid", config.ID, vars, config, "input")
input.ID = r.Replace(input.ID, "reference", config.Reference, vars, config, "input")
vars["inputid"] = input.ID
input.Address = r.Replace(input.Address, "inputid", input.ID, vars, config, "input")
input.Address = r.Replace(input.Address, "processid", config.ID, vars, config, "input")
input.Address = r.Replace(input.Address, "reference", config.Reference, vars, config, "input")
input.Address = r.Replace(input.Address, "diskfs", "", vars, config, "input")
input.Address = r.Replace(input.Address, "memfs", "", vars, config, "input")
input.Address = r.Replace(input.Address, "fs:*", "", vars, config, "input")
input.Address = r.Replace(input.Address, "rtmp", "", vars, config, "input")
input.Address = r.Replace(input.Address, "srt", "", vars, config, "input")
for j, option := range input.Options {
// Replace any known placeholders
option = r.Replace(option, "inputid", input.ID, vars, config, "input")
option = r.Replace(option, "processid", config.ID, vars, config, "input")
option = r.Replace(option, "reference", config.Reference, vars, config, "input")
option = r.Replace(option, "diskfs", "", vars, config, "input")
option = r.Replace(option, "memfs", "", vars, config, "input")
option = r.Replace(option, "fs:*", "", vars, config, "input")
input.Options[j] = option
}
delete(vars, "inputid")
config.Input[i] = input
}
// Resolving the given outputs
for i, output := range config.Output {
// Replace any known placeholders
output.ID = r.Replace(output.ID, "processid", config.ID, vars, config, "output")
output.ID = r.Replace(output.ID, "reference", config.Reference, vars, config, "output")
vars["outputid"] = output.ID
output.Address = r.Replace(output.Address, "outputid", output.ID, vars, config, "output")
output.Address = r.Replace(output.Address, "processid", config.ID, vars, config, "output")
output.Address = r.Replace(output.Address, "reference", config.Reference, vars, config, "output")
output.Address = r.Replace(output.Address, "diskfs", "", vars, config, "output")
output.Address = r.Replace(output.Address, "memfs", "", vars, config, "output")
output.Address = r.Replace(output.Address, "fs:*", "", vars, config, "output")
output.Address = r.Replace(output.Address, "rtmp", "", vars, config, "output")
output.Address = r.Replace(output.Address, "srt", "", vars, config, "output")
for j, option := range output.Options {
// Replace any known placeholders
option = r.Replace(option, "outputid", output.ID, vars, config, "output")
option = r.Replace(option, "processid", config.ID, vars, config, "output")
option = r.Replace(option, "reference", config.Reference, vars, config, "output")
option = r.Replace(option, "diskfs", "", vars, config, "output")
option = r.Replace(option, "memfs", "", vars, config, "output")
option = r.Replace(option, "fs:*", "", vars, config, "output")
output.Options[j] = option
}
for j, cleanup := range output.Cleanup {
// Replace any known placeholders
cleanup.Pattern = r.Replace(cleanup.Pattern, "outputid", output.ID, vars, config, "output")
cleanup.Pattern = r.Replace(cleanup.Pattern, "processid", config.ID, vars, config, "output")
cleanup.Pattern = r.Replace(cleanup.Pattern, "reference", config.Reference, vars, config, "output")
output.Cleanup[j] = cleanup
}
delete(vars, "outputid")
config.Output[i] = output
}
}
// resolveDynamicPlaceholder replaces placeholders in the config that should be replaced at process start.
// The config will be modified in place.
func resolveDynamicPlaceholder(config *app.Config, r replace.Replacer) {
vars := map[string]string{
"timestamp": time.Now().UTC().Format(time.RFC3339),
}
for i, option := range config.Options {
option = r.Replace(option, "date", "", vars, config, "global")
config.Options[i] = option
}
for i, input := range config.Input {
input.Address = r.Replace(input.Address, "date", "", vars, config, "input")
for j, option := range input.Options {
option = r.Replace(option, "date", "", vars, config, "input")
input.Options[j] = option
}
config.Input[i] = input
}
for i, output := range config.Output {
output.Address = r.Replace(output.Address, "date", "", vars, config, "output")
for j, option := range output.Options {
option = r.Replace(option, "date", "", vars, config, "output")
output.Options[j] = option
}
for j, cleanup := range output.Cleanup {
cleanup.Pattern = r.Replace(cleanup.Pattern, "date", "", vars, config, "output")
output.Cleanup[j] = cleanup
}
config.Output[i] = output
}
}