From d930a91cbb6a8be739b50ac4b407cebdcb134a74 Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Wed, 9 Aug 2023 09:30:17 +0300 Subject: [PATCH] Provide Probe() a config instead of a process ID --- app/import/import.go | 6 +-- http/graph/resolver/process.resolvers.go | 8 +++- http/handler/api/restream.go | 7 +++- restream/restream.go | 48 +++++++++++------------- restream/restream_test.go | 6 +-- 5 files changed, 37 insertions(+), 38 deletions(-) diff --git a/app/import/import.go b/app/import/import.go index 1e291a82..bbc5fec0 100644 --- a/app/import/import.go +++ b/app/import/import.go @@ -1464,11 +1464,7 @@ func probeInput(binary string, config app.Config) app.Probe { return app.Probe{} } - rs.AddProcess(&config) - - id := config.ProcessID() - probe := rs.Probe(id) - rs.DeleteProcess(id) + probe := rs.Probe(&config, 20*time.Second) return probe } diff --git a/http/graph/resolver/process.resolvers.go b/http/graph/resolver/process.resolvers.go index e71b51a0..07f4b72b 100644 --- a/http/graph/resolver/process.resolvers.go +++ b/http/graph/resolver/process.resolvers.go @@ -7,6 +7,7 @@ package resolver import ( "context" "fmt" + "time" "github.com/datarhei/core/v16/http/graph/models" "github.com/datarhei/core/v16/restream/app" @@ -64,7 +65,12 @@ func (r *queryResolver) Probe(ctx context.Context, id string, domain string) (*m Domain: domain, } - probe := r.Restream.Probe(tid) + process, err := r.Restream.GetProcess(tid) + if err != nil { + return nil, fmt.Errorf("not found") + } + + probe := r.Restream.Probe(process.Config, 20*time.Second) p := &models.Probe{} p.UnmarshalRestream(probe) diff --git a/http/handler/api/restream.go b/http/handler/api/restream.go index 3919c524..be38844e 100644 --- a/http/handler/api/restream.go +++ b/http/handler/api/restream.go @@ -659,7 +659,12 @@ func (h *RestreamHandler) Probe(c echo.Context) error { Domain: domain, } - probe := h.restream.Probe(tid) + process, err := h.restream.GetProcess(tid) + if err != nil { + return api.Err(http.StatusNotFound, "") + } + + probe := h.restream.Probe(process.Config, 20*time.Second) apiprobe := api.Probe{} apiprobe.Unmarshal(&probe) diff --git a/restream/restream.go b/restream/restream.go index a7271854..dbba4ed1 100644 --- a/restream/restream.go +++ b/restream/restream.go @@ -58,10 +58,10 @@ type Restreamer interface { GetProcessLog(id app.ProcessID) (*app.Log, error) // Get the logs of a process SearchProcessLogHistory(idpattern, refpattern, state string, from, to *time.Time) []app.LogHistorySearchResult // Search the log history of all processes GetPlayout(id app.ProcessID, inputid string) (string, error) // Get the URL of the playout API for a process - Probe(id app.ProcessID) app.Probe // Probe a process - ProbeWithTimeout(id app.ProcessID, timeout time.Duration) app.Probe // Probe a process with specific timeout SetProcessMetadata(id app.ProcessID, key string, data interface{}) error // Set metatdata to a process GetProcessMetadata(id app.ProcessID, key string) (interface{}, error) // Get previously set metadata from a process + + Probe(config *app.Config, timeout time.Duration) app.Probe // Probe a process with specific timeout } // Config is the required configuration for a new restreamer instance. @@ -85,8 +85,8 @@ type task struct { domain string reference string process *app.Process - config *app.Config - command []string // The actual command parameter for ffmpeg + config *app.Config // Process config with replaced static placeholders + command []string // The actual command parameter for ffmpeg ffmpeg process.Process parser parse.Parser playout map[string]int @@ -1830,40 +1830,29 @@ func (r *restream) SearchProcessLogHistory(idpattern, refpattern, state string, return result } -func (r *restream) Probe(id app.ProcessID) app.Probe { - return r.ProbeWithTimeout(id, 20*time.Second) -} - -func (r *restream) ProbeWithTimeout(id app.ProcessID, timeout time.Duration) app.Probe { +func (r *restream) Probe(config *app.Config, timeout time.Duration) app.Probe { appprobe := app.Probe{} - r.lock.RLock() + config = config.Clone() - task, ok := r.tasks[id] - if !ok { - appprobe.Log = append(appprobe.Log, fmt.Sprintf("Unknown process ID (%s)", id)) - r.lock.RUnlock() - return appprobe - } - - r.lock.RUnlock() - - if !task.valid { - return appprobe - } + resolveStaticPlaceholders(config, r.replace) + resolveDynamicPlaceholder(config, r.replace) var command []string // Copy global options - command = append(command, task.config.Options...) + command = append(command, config.Options...) - for _, input := range task.config.Input { + for _, input := range config.Input { // Add the resolved input to the process command command = append(command, input.Options...) command = append(command, "-i", input.Address) } - prober := r.ffmpeg.NewProbeParser(task.logger) + logbuffer := log.NewBufferWriter(log.Ldebug, 1000) + logger := log.New("").WithOutput(logbuffer) + + prober := r.ffmpeg.NewProbeParser(logger) var wg sync.WaitGroup @@ -1875,14 +1864,21 @@ func (r *restream) ProbeWithTimeout(id app.ProcessID, timeout time.Duration) app StaleTimeout: timeout, Args: command, Parser: prober, - Logger: task.logger, + Logger: logger, OnExit: func(string) { wg.Done() }, }) if err != nil { + formatter := log.NewConsoleFormatter(false) + + for _, e := range logbuffer.Events() { + appprobe.Log = append(appprobe.Log, strings.TrimSpace(formatter.String(e))) + } + appprobe.Log = append(appprobe.Log, err.Error()) + return appprobe } diff --git a/restream/restream_test.go b/restream/restream_test.go index ecf5cdf6..417278b2 100644 --- a/restream/restream_test.go +++ b/restream/restream_test.go @@ -627,11 +627,7 @@ func TestProbeProcess(t *testing.T) { require.NoError(t, err) process := getDummyProcess() - tid := app.ProcessID{ID: process.ID} - - rs.AddProcess(process) - - probe := rs.ProbeWithTimeout(tid, 5*time.Second) + probe := rs.Probe(process, 5*time.Second) require.Equal(t, 3, len(probe.Streams)) }