From b2cd8f713325d95f356873c955bf4b7b17c99d03 Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Mon, 23 Jan 2023 17:09:55 +0100 Subject: [PATCH] Allow probe with individual timeout --- restream/restream.go | 55 +++++++++++++++++++++------------------ restream/restream_test.go | 13 +++++++++ 2 files changed, 43 insertions(+), 25 deletions(-) diff --git a/restream/restream.go b/restream/restream.go index 9f7c6ee1..f654cbf5 100644 --- a/restream/restream.go +++ b/restream/restream.go @@ -30,30 +30,31 @@ import ( // The Restreamer interface type Restreamer interface { - ID() string // ID of this instance - Name() string // Arbitrary name of this instance - CreatedAt() time.Time // Time of when this instance has been created - Start() // Start all processes that have a "start" order - Stop() // Stop all running process but keep their "start" order - AddProcess(config *app.Config) error // Add a new process - GetProcessIDs(idpattern, refpattern string) []string // Get a list of process IDs based on patterns for ID and reference - DeleteProcess(id string) error // Delete a process - UpdateProcess(id string, config *app.Config) error // Update a process - StartProcess(id string) error // Start a process - StopProcess(id string) error // Stop a process - RestartProcess(id string) error // Restart a process - ReloadProcess(id string) error // Reload a process - GetProcess(id string) (*app.Process, error) // Get a process - GetProcessState(id string) (*app.State, error) // Get the state of a process - GetProcessLog(id string) (*app.Log, error) // Get the logs of a process - GetPlayout(id, inputid string) (string, error) // Get the URL of the playout API for a process - Probe(id string) app.Probe // Probe a process - Skills() skills.Skills // Get the ffmpeg skills - ReloadSkills() error // Reload the ffmpeg skills - SetProcessMetadata(id, key string, data interface{}) error // Set metatdata to a process - GetProcessMetadata(id, key string) (interface{}, error) // Get previously set metadata from a process - SetMetadata(key string, data interface{}) error // Set general metadata - GetMetadata(key string) (interface{}, error) // Get previously set general metadata + ID() string // ID of this instance + Name() string // Arbitrary name of this instance + CreatedAt() time.Time // Time of when this instance has been created + Start() // Start all processes that have a "start" order + Stop() // Stop all running process but keep their "start" order + AddProcess(config *app.Config) error // Add a new process + GetProcessIDs(idpattern, refpattern string) []string // Get a list of process IDs based on patterns for ID and reference + DeleteProcess(id string) error // Delete a process + UpdateProcess(id string, config *app.Config) error // Update a process + StartProcess(id string) error // Start a process + StopProcess(id string) error // Stop a process + RestartProcess(id string) error // Restart a process + ReloadProcess(id string) error // Reload a process + GetProcess(id string) (*app.Process, error) // Get a process + GetProcessState(id string) (*app.State, error) // Get the state of a process + GetProcessLog(id string) (*app.Log, error) // Get the logs of a process + GetPlayout(id, inputid string) (string, error) // Get the URL of the playout API for a process + Probe(id string) app.Probe // Probe a process + ProbeWithTimeout(id string, timeout time.Duration) app.Probe // Probe a process with specific timeout + Skills() skills.Skills // Get the ffmpeg skills + ReloadSkills() error // Reload the ffmpeg skills + SetProcessMetadata(id, key string, data interface{}) error // Set metatdata to a process + GetProcessMetadata(id, key string) (interface{}, error) // Get previously set metadata from a process + SetMetadata(key string, data interface{}) error // Set general metadata + GetMetadata(key string) (interface{}, error) // Get previously set general metadata } // Config is the required configuration for a new restreamer instance. @@ -1251,6 +1252,10 @@ func (r *restream) GetProcessLog(id string) (*app.Log, error) { } func (r *restream) Probe(id string) app.Probe { + return r.ProbeWithTimeout(id, 20*time.Second) +} + +func (r *restream) ProbeWithTimeout(id string, timeout time.Duration) app.Probe { r.lock.RLock() appprobe := app.Probe{} @@ -1288,7 +1293,7 @@ func (r *restream) Probe(id string) app.Probe { ffmpeg, err := r.ffmpeg.New(ffmpeg.ProcessConfig{ Reconnect: false, ReconnectDelay: 0, - StaleTimeout: 20 * time.Second, + StaleTimeout: timeout, Command: command, Parser: prober, Logger: task.logger, diff --git a/restream/restream_test.go b/restream/restream_test.go index e4b0510d..11b08240 100644 --- a/restream/restream_test.go +++ b/restream/restream_test.go @@ -394,6 +394,19 @@ func TestReloadProcess(t *testing.T) { rs.StopProcess(process.ID) } +func TestProbeProcess(t *testing.T) { + rs, err := getDummyRestreamer(nil, nil, nil, nil) + require.NoError(t, err) + + process := getDummyProcess() + + rs.AddProcess(process) + + probe := rs.ProbeWithTimeout(process.ID, 5*time.Second) + + require.Equal(t, 3, len(probe.Streams)) +} + func TestProcessMetadata(t *testing.T) { rs, err := getDummyRestreamer(nil, nil, nil, nil) require.NoError(t, err)