Add v16.8.0

This commit is contained in:
Jan Stabenow
2022-06-03 17:21:52 +02:00
parent c8bebd95ef
commit 9746248c10
431 changed files with 14782 additions and 13944 deletions

View File

@@ -3,9 +3,10 @@ package app
import "github.com/datarhei/core/process"
type ConfigIOCleanup struct {
Pattern string `json:"pattern"`
MaxFiles uint `json:"max_files"`
MaxFileAge uint `json:"max_file_age_seconds"`
Pattern string `json:"pattern"`
MaxFiles uint `json:"max_files"`
MaxFileAge uint `json:"max_file_age_seconds"`
PurgeOnDelete bool `json:"purge_on_delete"`
}
type ConfigIO struct {

View File

@@ -16,9 +16,10 @@ type Config struct {
}
type Pattern struct {
Pattern string
MaxFiles uint
MaxFileAge time.Duration
Pattern string
MaxFiles uint
MaxFileAge time.Duration
PurgeOnDelete bool
}
type Filesystem interface {
@@ -116,7 +117,11 @@ func (fs *filesystem) UnsetCleanup(id string) {
fs.cleanupLock.Lock()
defer fs.cleanupLock.Unlock()
patterns, _ := fs.cleanupPatterns[id]
delete(fs.cleanupPatterns, id)
fs.purge(patterns)
}
func (fs *filesystem) cleanup() {
@@ -150,6 +155,20 @@ func (fs *filesystem) cleanup() {
}
}
func (fs *filesystem) purge(patterns []Pattern) {
for _, pattern := range patterns {
if !pattern.PurgeOnDelete {
continue
}
files := fs.Filesystem.List(pattern.Pattern)
for _, f := range files {
fs.logger.Debug().WithField("path", f.Name()).Log("Purging file")
fs.Filesystem.Delete(f.Name())
}
}
}
func (fs *filesystem) cleanupTicker(ctx context.Context, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()

View File

@@ -63,6 +63,7 @@ type Config struct {
}
type task struct {
valid bool
id string // ID of the task/process
reference string
process *app.Process
@@ -188,7 +189,9 @@ func (r *restream) Stop() {
// altering their order such that on a subsequent
// Start() they will get restarted.
for id, t := range r.tasks {
t.ffmpeg.Stop()
if t.ffmpeg != nil {
t.ffmpeg.Stop()
}
r.unsetCleanup(id)
}
@@ -221,6 +224,10 @@ func (r *restream) observe(ctx context.Context, interval time.Duration) {
// Stop all tasks that write to disk
r.lock.Lock()
for id, t := range r.tasks {
if !t.valid {
continue
}
if !t.usesDisk {
continue
}
@@ -244,8 +251,7 @@ func (r *restream) load() error {
return err
}
r.metadata = data.Metadata.System
r.tasks = make(map[string]*task)
tasks := make(map[string]*task)
for id, process := range data.Process {
t := &task{
@@ -259,11 +265,11 @@ func (r *restream) load() error {
// Replace all placeholders in the config
r.resolvePlaceholders(t.config, r.fs.diskfs.Base(), r.fs.memfs.Base())
r.tasks[id] = t
tasks[id] = t
}
for id, userdata := range data.Metadata.Process {
t, ok := r.tasks[id]
t, ok := tasks[id]
if !ok {
continue
}
@@ -274,20 +280,23 @@ func (r *restream) load() error {
// Now that all tasks are defined and all placeholders are
// replaced, we can resolve references and validate the
// inputs and outputs.
for _, t := range r.tasks {
err := r.resolveAddresses(t.config)
for _, t := range tasks {
err := r.resolveAddresses(tasks, t.config)
if err != nil {
return err
r.logger.Warn().WithField("id", t.id).WithError(err).Log("Ignoring")
continue
}
t.usesDisk, err = r.validateConfig(t.config)
if err != nil {
return err
r.logger.Warn().WithField("id", t.id).WithError(err).Log("Ignoring")
continue
}
err = r.setPlayoutPorts(t)
if err != nil {
return err
r.logger.Warn().WithField("id", t.id).WithError(err).Log("Ignoring")
continue
}
t.command = r.createCommand(t.config)
@@ -306,8 +315,12 @@ func (r *restream) load() error {
}
t.ffmpeg = ffmpeg
t.valid = true
}
r.tasks = tasks
r.metadata = data.Metadata.System
return nil
}
@@ -399,7 +412,7 @@ func (r *restream) createTask(config *app.Config) (*task, error) {
r.resolvePlaceholders(t.config, r.fs.diskfs.Base(), r.fs.memfs.Base())
err := r.resolveAddresses(t.config)
err := r.resolveAddresses(r.tasks, t.config)
if err != nil {
return nil, err
}
@@ -430,6 +443,7 @@ func (r *restream) createTask(config *app.Config) (*task, error) {
}
t.ffmpeg = ffmpeg
t.valid = true
return t, nil
}
@@ -440,17 +454,19 @@ func (r *restream) setCleanup(id string, config *app.Config) {
if strings.HasPrefix(c.Pattern, "memfs:") {
r.fs.memfs.SetCleanup(id, []rfs.Pattern{
{
Pattern: strings.TrimPrefix(c.Pattern, "memfs:"),
MaxFiles: c.MaxFiles,
MaxFileAge: time.Duration(c.MaxFileAge) * time.Second,
Pattern: strings.TrimPrefix(c.Pattern, "memfs:"),
MaxFiles: c.MaxFiles,
MaxFileAge: time.Duration(c.MaxFileAge) * time.Second,
PurgeOnDelete: c.PurgeOnDelete,
},
})
} else if strings.HasPrefix(c.Pattern, "diskfs:") {
r.fs.memfs.SetCleanup(id, []rfs.Pattern{
{
Pattern: strings.TrimPrefix(c.Pattern, "diskfs:"),
MaxFiles: c.MaxFiles,
MaxFileAge: time.Duration(c.MaxFileAge) * time.Second,
Pattern: strings.TrimPrefix(c.Pattern, "diskfs:"),
MaxFiles: c.MaxFiles,
MaxFileAge: time.Duration(c.MaxFileAge) * time.Second,
PurgeOnDelete: c.PurgeOnDelete,
},
})
}
@@ -754,10 +770,10 @@ func (r *restream) validateOutputAddress(address, basedir string) (string, bool,
return "file:" + address, true, nil
}
func (r *restream) resolveAddresses(config *app.Config) error {
func (r *restream) resolveAddresses(tasks map[string]*task, config *app.Config) error {
for i, input := range config.Input {
// Resolve any references
address, err := r.resolveAddress(config.ID, input.Address)
address, err := r.resolveAddress(tasks, config.ID, input.Address)
if err != nil {
return fmt.Errorf("reference error for '#%s:%s': %w", config.ID, input.ID, err)
}
@@ -770,7 +786,7 @@ func (r *restream) resolveAddresses(config *app.Config) error {
return nil
}
func (r *restream) resolveAddress(id, address string) (string, error) {
func (r *restream) resolveAddress(tasks map[string]*task, id, address string) (string, error) {
re := regexp.MustCompile(`^#(.+):output=(.+)`)
if len(address) == 0 {
@@ -790,7 +806,7 @@ func (r *restream) resolveAddress(id, address string) (string, error) {
return address, fmt.Errorf("self-reference not possible (%s)", address)
}
task, ok := r.tasks[matches[1]]
task, ok := tasks[matches[1]]
if !ok {
return address, fmt.Errorf("unknown process '%s' (%s)", matches[1], address)
}
@@ -886,6 +902,10 @@ func (r *restream) startProcess(id string) error {
return fmt.Errorf("unknown process ID (%s)", id)
}
if !task.valid {
return fmt.Errorf("invalid process definition")
}
status := task.ffmpeg.Status()
if task.process.Order == "start" && status.Order == "start" {
@@ -953,6 +973,10 @@ func (r *restream) restartProcess(id string) error {
return fmt.Errorf("unknown process ID (%s)", id)
}
if !task.valid {
return fmt.Errorf("invalid process definition")
}
if task.process.Order == "stop" {
return nil
}
@@ -982,11 +1006,13 @@ func (r *restream) reloadProcess(id string) error {
return fmt.Errorf("unknown process ID (%s)", id)
}
t.valid = false
t.config = t.process.Config.Clone()
r.resolvePlaceholders(t.config, r.fs.diskfs.Base(), r.fs.memfs.Base())
err := r.resolveAddresses(t.config)
err := r.resolveAddresses(r.tasks, t.config)
if err != nil {
return err
}
@@ -1024,6 +1050,7 @@ func (r *restream) reloadProcess(id string) error {
}
t.ffmpeg = ffmpeg
t.valid = true
if order == "start" {
r.startProcess(id)
@@ -1043,6 +1070,10 @@ func (r *restream) GetProcessState(id string) (*app.State, error) {
return state, fmt.Errorf("unknown process ID (%s)", id)
}
if !task.valid {
return state, fmt.Errorf("invalid process definition")
}
status := task.ffmpeg.Status()
state.Order = task.process.Order
@@ -1100,6 +1131,10 @@ func (r *restream) GetProcessLog(id string) (*app.Log, error) {
return &app.Log{}, fmt.Errorf("unknown process ID (%s)", id)
}
if !task.valid {
return &app.Log{}, fmt.Errorf("invalid process definition")
}
log := &app.Log{}
current := task.parser.Report()
@@ -1150,6 +1185,10 @@ func (r *restream) Probe(id string) app.Probe {
r.lock.RUnlock()
if !task.valid {
return appprobe
}
var command []string
// Copy global options
@@ -1210,6 +1249,10 @@ func (r *restream) GetPlayout(id, inputid string) (string, error) {
return "", fmt.Errorf("unknown process ID '%s'", id)
}
if !task.valid {
return "", fmt.Errorf("Invalid process definition")
}
port, ok := task.playout[inputid]
if !ok {
return "", fmt.Errorf("no playout for input ID '%s' and process '%s'", inputid, id)