4 Commits

Author SHA1 Message Date
Ingo Oppermann
fc97a6f069 Fix tests and marshalling process config 2025-05-15 15:09:47 +02:00
Ingo Oppermann
cc0da080c6 Fix omitting empty cleanup rules 2025-05-15 12:17:17 +02:00
Ingo Oppermann
d5c03932b5 Validate process config before adding/updating the cluster DB 2025-05-15 11:10:10 +02:00
Ingo Oppermann
29d0e753ae Fix tests 2025-05-14 15:22:08 +02:00
17 changed files with 344 additions and 11 deletions

View File

@@ -238,6 +238,9 @@ func TestSynchronizeOrderStop(t *testing.T) {
stack, resources, reality := synchronize(wish, want, have, nodes, 2*time.Minute)
require.Equal(t, []interface{}{
processOpNull{
processid: app.ProcessID{ID: "foobar"},
},
processOpStop{
nodeid: "node1",
processid: app.ProcessID{ID: "foobar"},
@@ -332,6 +335,9 @@ func TestSynchronizeOrderStart(t *testing.T) {
stack, resources, reality := synchronize(wish, want, have, nodes, 2*time.Minute)
require.Equal(t, []interface{}{
processOpNull{
processid: app.ProcessID{ID: "foobar"},
},
processOpStart{
nodeid: "node1",
processid: app.ProcessID{ID: "foobar"},
@@ -438,6 +444,9 @@ func TestSynchronizeAddReferenceAffinity(t *testing.T) {
stack, _, reality := synchronize(wish, want, have, nodes, 2*time.Minute)
require.Equal(t, []interface{}{
processOpNull{
processid: app.ProcessID{ID: "foobar"},
},
processOpAdd{
nodeid: "node2",
config: &app.Config{
@@ -542,6 +551,9 @@ func TestSynchronizeAddReferenceAffinityMultiple(t *testing.T) {
stack, _, reality := synchronize(wish, want, have, nodes, 2*time.Minute)
require.Equal(t, []interface{}{
processOpNull{
processid: app.ProcessID{ID: "foobar1"},
},
processOpAdd{
nodeid: "node2",
config: &app.Config{
@@ -1120,7 +1132,11 @@ func TestSynchronizeNoUpdate(t *testing.T) {
stack, _, reality := synchronize(wish, want, have, nodes, 2*time.Minute)
require.Empty(t, stack)
require.Equal(t, []interface{}{
processOpNull{
processid: app.ProcessID{ID: "foobar"},
},
}, stack)
require.Equal(t, map[string]string{
"foobar@": "node1",
@@ -1377,7 +1393,11 @@ func TestSynchronizeWaitDisconnectedNode(t *testing.T) {
stack, _, reality := synchronize(wish, want, have, nodes, 2*time.Minute)
require.Empty(t, stack)
require.Equal(t, []interface{}{
processOpNull{
processid: app.ProcessID{ID: "foobar1"},
},
}, stack)
require.Equal(t, map[string]string{
"foobar1@": "node1",
@@ -1464,6 +1484,9 @@ func TestSynchronizeWaitDisconnectedNodeNoWish(t *testing.T) {
stack, _, reality := synchronize(wish, want, have, nodes, 2*time.Minute)
require.Equal(t, []interface{}{
processOpNull{
processid: app.ProcessID{ID: "foobar1"},
},
processOpAdd{
nodeid: "node1",
config: &app.Config{
@@ -1562,6 +1585,9 @@ func TestSynchronizeWaitDisconnectedNodeUnrealisticWish(t *testing.T) {
stack, _, reality := synchronize(wish, want, have, nodes, 2*time.Minute)
require.Equal(t, []interface{}{
processOpNull{
processid: app.ProcessID{ID: "foobar1"},
},
processOpAdd{
nodeid: "node1",
config: &app.Config{
@@ -1660,6 +1686,9 @@ func TestSynchronizeTimeoutDisconnectedNode(t *testing.T) {
stack, _, reality := synchronize(wish, want, have, nodes, 2*time.Minute)
require.Equal(t, []interface{}{
processOpNull{
processid: app.ProcessID{ID: "foobar1"},
},
processOpAdd{
nodeid: "node1",
config: &app.Config{

View File

@@ -411,6 +411,18 @@ func (n *Core) ProcessProbeConfig(config *app.Config) (api.Probe, error) {
return probe, err
}
func (n *Core) ProcessValidateConfig(config *app.Config) error {
n.lock.RLock()
client := n.client
n.lock.RUnlock()
if client == nil {
return ErrNoPeer
}
return client.ProcessValidateConfig(config)
}
func (n *Core) ProcessList(options client.ProcessListOptions) ([]api.Process, error) {
n.lock.RLock()
client := n.client

View File

@@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
"math/rand/v2"
"net/url"
"sort"
"sync"
@@ -493,6 +494,18 @@ func (p *Manager) FindNodeForResources(nodeid string, cpu float64, memory uint64
return ""
}
func (p *Manager) GetRandomNode() string {
p.lock.RLock()
defer p.lock.RUnlock()
nodes := []string{}
for nodeid := range p.nodes {
nodes = append(nodes, nodeid)
}
return nodes[rand.IntN(len(p.nodes))]
}
func (p *Manager) ProcessList(options client.ProcessListOptions) []api.Process {
processChan := make(chan []api.Process, 64)
processList := []api.Process{}
@@ -627,6 +640,15 @@ func (p *Manager) ProcessProbeConfig(nodeid string, config *app.Config) (api.Pro
return node.Core().ProcessProbeConfig(config)
}
func (p *Manager) ProcessValidateConfig(nodeid string, config *app.Config) error {
node, err := p.NodeGet(nodeid)
if err != nil {
return err
}
return node.Core().ProcessValidateConfig(config)
}
func (p *Manager) Events(ctx context.Context, filters api.EventFilters) (<-chan api.Event, error) {
eventChan := make(chan api.Event, 128)

View File

@@ -12,6 +12,12 @@ func (c *cluster) ProcessAdd(origin string, config *app.Config) error {
return c.forwarder.ProcessAdd(origin, config)
}
nodeid := c.manager.GetRandomNode()
err := c.manager.ProcessValidateConfig(nodeid, config)
if err != nil {
return err
}
cmd := &store.Command{
Operation: store.OpAddProcess,
Data: &store.CommandAddProcess{
@@ -57,6 +63,12 @@ func (c *cluster) ProcessUpdate(origin string, id app.ProcessID, config *app.Con
return c.forwarder.ProcessUpdate(origin, id, config)
}
nodeid := c.manager.GetRandomNode()
err := c.manager.ProcessValidateConfig(nodeid, config)
if err != nil {
return err
}
cmd := &store.Command{
Operation: store.OpUpdateProcess,
Data: &store.CommandUpdateProcess{

View File

@@ -3597,7 +3597,7 @@ const docTemplate = `{
"tags": [
"v16.?.?"
],
"summary": "Add a new process",
"summary": "Probe a process config",
"operationId": "process-3-probe-config",
"parameters": [
{
@@ -3632,6 +3632,58 @@ const docTemplate = `{
}
}
},
"/api/v3/process/validate": {
"post": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "Probe a process to get a detailed stream information on the inputs.",
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"tags": [
"v16.?.?"
],
"summary": "Validate a process config",
"operationId": "process-3-validate-config",
"parameters": [
{
"description": "Process config",
"name": "config",
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/api.ProcessConfig"
}
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/api.Probe"
}
},
"400": {
"description": "Bad Request",
"schema": {
"$ref": "#/definitions/api.Error"
}
},
"403": {
"description": "Forbidden",
"schema": {
"$ref": "#/definitions/api.Error"
}
}
}
}
},
"/api/v3/process/{id}": {
"get": {
"security": [

View File

@@ -3590,7 +3590,7 @@
"tags": [
"v16.?.?"
],
"summary": "Add a new process",
"summary": "Probe a process config",
"operationId": "process-3-probe-config",
"parameters": [
{
@@ -3625,6 +3625,58 @@
}
}
},
"/api/v3/process/validate": {
"post": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "Probe a process to get a detailed stream information on the inputs.",
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"tags": [
"v16.?.?"
],
"summary": "Validate a process config",
"operationId": "process-3-validate-config",
"parameters": [
{
"description": "Process config",
"name": "config",
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/api.ProcessConfig"
}
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/api.Probe"
}
},
"400": {
"description": "Bad Request",
"schema": {
"$ref": "#/definitions/api.Error"
}
},
"403": {
"description": "Forbidden",
"schema": {
"$ref": "#/definitions/api.Error"
}
}
}
}
},
"/api/v3/process/{id}": {
"get": {
"security": [

View File

@@ -5964,7 +5964,40 @@ paths:
$ref: '#/definitions/api.Error'
security:
- ApiKeyAuth: []
summary: Add a new process
summary: Probe a process config
tags:
- v16.?.?
/api/v3/process/validate:
post:
consumes:
- application/json
description: Probe a process to get a detailed stream information on the inputs.
operationId: process-3-validate-config
parameters:
- description: Process config
in: body
name: config
required: true
schema:
$ref: '#/definitions/api.ProcessConfig'
produces:
- application/json
responses:
"200":
description: OK
schema:
$ref: '#/definitions/api.Probe'
"400":
description: Bad Request
schema:
$ref: '#/definitions/api.Error'
"403":
description: Forbidden
schema:
$ref: '#/definitions/api.Error'
security:
- ApiKeyAuth: []
summary: Validate a process config
tags:
- v16.?.?
/api/v3/report/process:

View File

@@ -144,7 +144,7 @@ type ProcessConfigIO struct {
ID string `json:"id"`
Address string `json:"address" validate:"required" jsonschema:"minLength=1"`
Options []string `json:"options"`
Cleanup []ProcessConfigIOCleanup `json:"cleanup,omitempty"`
Cleanup []ProcessConfigIOCleanup `json:"cleanup"`
}
type ProcessConfigIOCleanup struct {
@@ -309,6 +309,7 @@ func (cfg *ProcessConfig) Unmarshal(c *app.Config, metadata map[string]interface
io := ProcessConfigIO{
ID: x.ID,
Address: x.Address,
Cleanup: []ProcessConfigIOCleanup{},
}
io.Options = make([]string, len(x.Options))
@@ -321,6 +322,7 @@ func (cfg *ProcessConfig) Unmarshal(c *app.Config, metadata map[string]interface
io := ProcessConfigIO{
ID: x.ID,
Address: x.Address,
Cleanup: []ProcessConfigIOCleanup{},
}
io.Options = make([]string, len(x.Options))

View File

@@ -67,6 +67,7 @@ type RestClient interface {
ProcessCommand(id app.ProcessID, command string) error // PUT /v3/process/{id}/command
ProcessProbe(id app.ProcessID) (api.Probe, error) // GET /v3/process/{id}/probe
ProcessProbeConfig(config *app.Config) (api.Probe, error) // POST /v3/process/probe
ProcessValidateConfig(p *app.Config) error // POST /v3/process/validate
ProcessConfig(id app.ProcessID) (api.ProcessConfig, error) // GET /v3/process/{id}/config
ProcessReport(id app.ProcessID) (api.ProcessReport, error) // GET /v3/process/{id}/report
ProcessReportSet(id app.ProcessID, report *app.Report) error // PUT /v3/process/{id}/report

View File

@@ -226,6 +226,24 @@ func (r *restclient) ProcessProbeConfig(p *app.Config) (api.Probe, error) {
return probe, err
}
func (r *restclient) ProcessValidateConfig(p *app.Config) error {
buf := mem.Get()
defer mem.Put(buf)
config := api.ProcessConfig{}
config.Unmarshal(p, nil)
e := json.NewEncoder(buf)
e.Encode(config)
_, err := r.call("POST", "/v3/process/validate", nil, nil, "application/json", buf.Reader())
if err != nil {
return err
}
return nil
}
func (r *restclient) ProcessConfig(id app.ProcessID) (api.ProcessConfig, error) {
var p api.ProcessConfig

View File

@@ -29,4 +29,4 @@
"reconnect": true,
"reconnect_delay_seconds": 10,
"stale_timeout_seconds": 10
}
}

View File

@@ -28,4 +28,4 @@
"reconnect": true,
"reconnect_delay_seconds": 10,
"stale_timeout_seconds": 10
}
}

View File

@@ -29,4 +29,4 @@
"reconnect": true,
"reconnect_delay_seconds": 10,
"stale_timeout_seconds": 10
}
}

View File

@@ -303,6 +303,7 @@ func (h *ProcessHandler) Update(c echo.Context) error {
process := api.ProcessConfig{
ID: id,
Owner: ctxuser,
Domain: domain,
Type: "ffmpeg",
Autostart: true,
}
@@ -353,7 +354,9 @@ func (h *ProcessHandler) Update(c echo.Context) error {
h.restream.SetProcessMetadata(tid, key, data)
}
return c.JSON(http.StatusOK, process)
p, _ := h.getProcess(tid, newFilter("config"))
return c.JSON(http.StatusOK, p.Config)
}
// Command issues a command to a process
@@ -751,7 +754,7 @@ func (h *ProcessHandler) Probe(c echo.Context) error {
}
// ProbeConfig probes a process
// @Summary Add a new process
// @Summary Probe a process config
// @Description Probe a process to get a detailed stream information on the inputs.
// @Tags v16.?.?
// @ID process-3-probe-config
@@ -797,6 +800,49 @@ func (h *ProcessHandler) ProbeConfig(c echo.Context) error {
return c.JSON(http.StatusOK, apiprobe)
}
// ValaidateConfig validates a config
// @Summary Validate a process config
// @Description Probe a process to get a detailed stream information on the inputs.
// @Tags v16.?.?
// @ID process-3-validate-config
// @Accept json
// @Produce json
// @Param config body api.ProcessConfig true "Process config"
// @Success 200 {object} api.Probe
// @Failure 400 {object} api.Error
// @Failure 403 {object} api.Error
// @Security ApiKeyAuth
// @Router /api/v3/process/validate [post]
func (h *ProcessHandler) ValidateConfig(c echo.Context) error {
ctxuser := util.DefaultContext(c, "user", "")
process := api.ProcessConfig{
Owner: ctxuser,
Type: "ffmpeg",
}
if err := util.ShouldBindJSON(c, &process); err != nil {
return api.Err(http.StatusBadRequest, "", "invalid JSON: %s", err.Error())
}
if !h.iam.Enforce(ctxuser, process.Domain, "process", process.ID, "write") {
return api.Err(http.StatusForbidden, "", "You are not allowed to validate this process, check the domain and process ID")
}
if process.Type != "ffmpeg" {
return api.Err(http.StatusBadRequest, "", "unsupported process type, supported process types are: ffmpeg")
}
config, _ := process.Marshal()
err := h.restream.Validate(config)
if err != nil {
return api.Err(http.StatusBadRequest, "", "invalid config: %s", err.Error())
}
return c.JSON(http.StatusOK, process)
}
// Skills returns the detected FFmpeg capabilities
// @Summary FFmpeg capabilities
// @Description List all detected FFmpeg capabilities.

View File

@@ -641,6 +641,7 @@ func (s *server) setRoutesV3(v3 *echo.Group) {
if !s.readOnly {
v3.POST("/process/probe", s.v3handler.process.ProbeConfig)
v3.POST("/process/validate", s.v3handler.process.ValidateConfig)
v3.GET("/process/:id/probe", s.v3handler.process.Probe)
v3.POST("/process", s.v3handler.process.Add)
v3.PUT("/process/:id", s.v3handler.process.Update)

View File

@@ -66,6 +66,34 @@ func TestConfigHash(t *testing.T) {
require.False(t, bytes.Equal(hash1, hash2))
}
func TestConfigIOHash(t *testing.T) {
io1 := ConfigIO{
ID: "0",
Address: "-",
Options: []string{
"-codec",
"copy",
"-f",
"null",
},
Cleanup: []ConfigIOCleanup{},
}
io2 := ConfigIO{
ID: "0",
Address: "-",
Options: []string{
"-codec",
"copy",
"-f",
"null",
},
Cleanup: nil,
}
require.Equal(t, io1.HashString(), io2.HashString())
}
func TestProcessUsageCPU(t *testing.T) {
original := parse.UsageCPU{
NCPU: 1.5,

View File

@@ -58,6 +58,7 @@ type Restreamer interface {
GetProcessMetadata(id app.ProcessID, key string) (interface{}, error) // Get previously set metadata from a process
Probe(config *app.Config, timeout time.Duration) app.Probe // Probe a process with specific timeout
Validate(config *app.Config) error // Validate a process config
}
// Config is the required configuration for a new restreamer instance.
@@ -1920,3 +1921,27 @@ func hasPlaceholder(config *app.Config, r replace.Replacer, placeholder string)
return false
}
func (r *restream) Validate(config *app.Config) error {
cfg := config.Clone()
resolveStaticPlaceholders(cfg, r.replace)
err := r.resolveAddresses(r.tasks, cfg)
if err != nil {
return err
}
resolveDynamicPlaceholder(cfg, r.replace, map[string]string{
"hwdevice": "0",
}, map[string]string{
"timestamp": time.Now().UTC().Format(time.RFC3339),
})
_, err = validateConfig(cfg, r.fs.list, r.ffmpeg)
if err != nil {
return err
}
return nil
}