From 3a6281295cbfb2f0a05a063330e26bd281efb75f Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Wed, 24 Aug 2022 12:31:04 +0300 Subject: [PATCH] Use pattern to match placeholders for filesystems --- README.md | 21 +++++++++++---------- restream/restream.go | 11 ++++++----- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index 0c1baf20..5cf9367e 100644 --- a/README.md +++ b/README.md @@ -652,16 +652,17 @@ A command is defined as: Currently supported placeholders are: -| Placeholder | Description | Location | -| ------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------ | ----------------------------------------------------------------------------------------------------------------------- | -| `{diskfs}` | Will be replaced by the provided `CORE_STORAGE_DISK_DIR`. | `options`, `input.address`, `input.options`, `output.address`, `output.options` | -| `{memfs}` | Will be replace by the base URL of the MemFS. | `input.address`, `input.options`, `output.address`, `output.options` | -| `{processid}` | Will be replaced by the ID of the process. | `input.id`, `input.address`, `input.options`, `output.id`, `output.address`, `output.options`, `output.cleanup.pattern` | -| `{reference}` | Will be replaced by the reference of the process | `input.id`, `input.address`, `input.options`, `output.id`, `output.address`, `output.options`, `output.cleanup.pattern` | -| `{inputid}` | Will be replaced by the ID of the input. | `input.address`, `input.options` | -| `{outputid}` | Will be replaced by the ID of the output. | `output.address`, `output.options`, `output.cleanup.pattern` | -| `{rtmp}` | Will be replaced by the internal address of the RTMP server. Requires parameter `name` (name of the stream). | `input.address`, `output.address` | -| `{srt}` | Will be replaced by the internal address of the SRT server. Requires parameter `name` (name of the stream) and `mode` (either `publish` or `request`). | `input.address`, `output.address` | +| Placeholder | Description | Location | +| ------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------ | ----------------------------------------------------------------------------------------------------------------------- | +| `{diskfs}` or `{fs:disk}` | Will be replaced by the provided `CORE_STORAGE_DISK_DIR`. | `options`, `input.address`, `input.options`, `output.address`, `output.options` | +| `{memfs}` or `{fs:mem}` | Will be replaced by the base address of the MemFS. | `input.address`, `input.options`, `output.address`, `output.options` | +| `{fs:*}` | Will be replaces by the base address of the respective filesystem. | See `{memfs}` | +| `{processid}` | Will be replaced by the ID of the process. | `input.id`, `input.address`, `input.options`, `output.id`, `output.address`, `output.options`, `output.cleanup.pattern` | +| `{reference}` | Will be replaced by the reference of the process | `input.id`, `input.address`, `input.options`, `output.id`, `output.address`, `output.options`, `output.cleanup.pattern` | +| `{inputid}` | Will be replaced by the ID of the input. | `input.address`, `input.options` | +| `{outputid}` | Will be replaced by the ID of the output. | `output.address`, `output.options`, `output.cleanup.pattern` | +| `{rtmp}` | Will be replaced by the internal address of the RTMP server. Requires parameter `name` (name of the stream). | `input.address`, `output.address` | +| `{srt}` | Will be replaced by the internal address of the SRT server. Requires parameter `name` (name of the stream) and `mode` (either `publish` or `request`). | `input.address`, `output.address` | Before replacing the placeholders in the process config, all references (see below) will be resolved. diff --git a/restream/restream.go b/restream/restream.go index 50577bc6..606960cd 100644 --- a/restream/restream.go +++ b/restream/restream.go @@ -59,6 +59,7 @@ type Config struct { ID string Name string Store store.Store + Filesystems []fs.Filesystem DiskFS fs.Filesystem MemFS fs.Filesystem Replace replace.Replacer @@ -188,7 +189,7 @@ func (r *restream) Start() { ctx, cancel := context.WithCancel(context.Background()) r.fs.stopObserver = cancel - go r.observe(ctx, 10*time.Second) + go r.observe(ctx, r.fs.diskfs, 10*time.Second) r.stopOnce = sync.Once{} }) @@ -219,7 +220,7 @@ func (r *restream) Stop() { }) } -func (r *restream) observe(ctx context.Context, interval time.Duration) { +func (r *restream) observe(ctx context.Context, fs fs.Filesystem, interval time.Duration) { ticker := time.NewTicker(interval) defer ticker.Stop() @@ -228,14 +229,14 @@ func (r *restream) observe(ctx context.Context, interval time.Duration) { case <-ctx.Done(): return case <-ticker.C: - size, limit := r.fs.diskfs.Size() + size, limit := fs.Size() isFull := false if limit > 0 && size >= limit { isFull = true } if isFull { - // Stop all tasks that write to disk + // Stop all tasks that write to this filesystem r.lock.Lock() for id, t := range r.tasks { if !t.valid { @@ -250,7 +251,7 @@ func (r *restream) observe(ctx context.Context, interval time.Duration) { continue } - r.logger.Warn().Log("Shutting down because disk is full") + r.logger.Warn().Log("Shutting down because filesystem is full") r.stopProcess(id) } r.lock.Unlock()