mirror of
https://github.com/datarhei/core.git
synced 2025-09-26 20:11:29 +08:00
Compare commits
4 Commits
88f5099972
...
fc97a6f069
Author | SHA1 | Date | |
---|---|---|---|
![]() |
fc97a6f069 | ||
![]() |
cc0da080c6 | ||
![]() |
d5c03932b5 | ||
![]() |
29d0e753ae |
@@ -238,6 +238,9 @@ func TestSynchronizeOrderStop(t *testing.T) {
|
||||
stack, resources, reality := synchronize(wish, want, have, nodes, 2*time.Minute)
|
||||
|
||||
require.Equal(t, []interface{}{
|
||||
processOpNull{
|
||||
processid: app.ProcessID{ID: "foobar"},
|
||||
},
|
||||
processOpStop{
|
||||
nodeid: "node1",
|
||||
processid: app.ProcessID{ID: "foobar"},
|
||||
@@ -332,6 +335,9 @@ func TestSynchronizeOrderStart(t *testing.T) {
|
||||
stack, resources, reality := synchronize(wish, want, have, nodes, 2*time.Minute)
|
||||
|
||||
require.Equal(t, []interface{}{
|
||||
processOpNull{
|
||||
processid: app.ProcessID{ID: "foobar"},
|
||||
},
|
||||
processOpStart{
|
||||
nodeid: "node1",
|
||||
processid: app.ProcessID{ID: "foobar"},
|
||||
@@ -438,6 +444,9 @@ func TestSynchronizeAddReferenceAffinity(t *testing.T) {
|
||||
stack, _, reality := synchronize(wish, want, have, nodes, 2*time.Minute)
|
||||
|
||||
require.Equal(t, []interface{}{
|
||||
processOpNull{
|
||||
processid: app.ProcessID{ID: "foobar"},
|
||||
},
|
||||
processOpAdd{
|
||||
nodeid: "node2",
|
||||
config: &app.Config{
|
||||
@@ -542,6 +551,9 @@ func TestSynchronizeAddReferenceAffinityMultiple(t *testing.T) {
|
||||
stack, _, reality := synchronize(wish, want, have, nodes, 2*time.Minute)
|
||||
|
||||
require.Equal(t, []interface{}{
|
||||
processOpNull{
|
||||
processid: app.ProcessID{ID: "foobar1"},
|
||||
},
|
||||
processOpAdd{
|
||||
nodeid: "node2",
|
||||
config: &app.Config{
|
||||
@@ -1120,7 +1132,11 @@ func TestSynchronizeNoUpdate(t *testing.T) {
|
||||
|
||||
stack, _, reality := synchronize(wish, want, have, nodes, 2*time.Minute)
|
||||
|
||||
require.Empty(t, stack)
|
||||
require.Equal(t, []interface{}{
|
||||
processOpNull{
|
||||
processid: app.ProcessID{ID: "foobar"},
|
||||
},
|
||||
}, stack)
|
||||
|
||||
require.Equal(t, map[string]string{
|
||||
"foobar@": "node1",
|
||||
@@ -1377,7 +1393,11 @@ func TestSynchronizeWaitDisconnectedNode(t *testing.T) {
|
||||
|
||||
stack, _, reality := synchronize(wish, want, have, nodes, 2*time.Minute)
|
||||
|
||||
require.Empty(t, stack)
|
||||
require.Equal(t, []interface{}{
|
||||
processOpNull{
|
||||
processid: app.ProcessID{ID: "foobar1"},
|
||||
},
|
||||
}, stack)
|
||||
|
||||
require.Equal(t, map[string]string{
|
||||
"foobar1@": "node1",
|
||||
@@ -1464,6 +1484,9 @@ func TestSynchronizeWaitDisconnectedNodeNoWish(t *testing.T) {
|
||||
stack, _, reality := synchronize(wish, want, have, nodes, 2*time.Minute)
|
||||
|
||||
require.Equal(t, []interface{}{
|
||||
processOpNull{
|
||||
processid: app.ProcessID{ID: "foobar1"},
|
||||
},
|
||||
processOpAdd{
|
||||
nodeid: "node1",
|
||||
config: &app.Config{
|
||||
@@ -1562,6 +1585,9 @@ func TestSynchronizeWaitDisconnectedNodeUnrealisticWish(t *testing.T) {
|
||||
stack, _, reality := synchronize(wish, want, have, nodes, 2*time.Minute)
|
||||
|
||||
require.Equal(t, []interface{}{
|
||||
processOpNull{
|
||||
processid: app.ProcessID{ID: "foobar1"},
|
||||
},
|
||||
processOpAdd{
|
||||
nodeid: "node1",
|
||||
config: &app.Config{
|
||||
@@ -1660,6 +1686,9 @@ func TestSynchronizeTimeoutDisconnectedNode(t *testing.T) {
|
||||
stack, _, reality := synchronize(wish, want, have, nodes, 2*time.Minute)
|
||||
|
||||
require.Equal(t, []interface{}{
|
||||
processOpNull{
|
||||
processid: app.ProcessID{ID: "foobar1"},
|
||||
},
|
||||
processOpAdd{
|
||||
nodeid: "node1",
|
||||
config: &app.Config{
|
||||
|
@@ -411,6 +411,18 @@ func (n *Core) ProcessProbeConfig(config *app.Config) (api.Probe, error) {
|
||||
return probe, err
|
||||
}
|
||||
|
||||
func (n *Core) ProcessValidateConfig(config *app.Config) error {
|
||||
n.lock.RLock()
|
||||
client := n.client
|
||||
n.lock.RUnlock()
|
||||
|
||||
if client == nil {
|
||||
return ErrNoPeer
|
||||
}
|
||||
|
||||
return client.ProcessValidateConfig(config)
|
||||
}
|
||||
|
||||
func (n *Core) ProcessList(options client.ProcessListOptions) ([]api.Process, error) {
|
||||
n.lock.RLock()
|
||||
client := n.client
|
||||
|
@@ -5,6 +5,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"math/rand/v2"
|
||||
"net/url"
|
||||
"sort"
|
||||
"sync"
|
||||
@@ -493,6 +494,18 @@ func (p *Manager) FindNodeForResources(nodeid string, cpu float64, memory uint64
|
||||
return ""
|
||||
}
|
||||
|
||||
func (p *Manager) GetRandomNode() string {
|
||||
p.lock.RLock()
|
||||
defer p.lock.RUnlock()
|
||||
|
||||
nodes := []string{}
|
||||
for nodeid := range p.nodes {
|
||||
nodes = append(nodes, nodeid)
|
||||
}
|
||||
|
||||
return nodes[rand.IntN(len(p.nodes))]
|
||||
}
|
||||
|
||||
func (p *Manager) ProcessList(options client.ProcessListOptions) []api.Process {
|
||||
processChan := make(chan []api.Process, 64)
|
||||
processList := []api.Process{}
|
||||
@@ -627,6 +640,15 @@ func (p *Manager) ProcessProbeConfig(nodeid string, config *app.Config) (api.Pro
|
||||
return node.Core().ProcessProbeConfig(config)
|
||||
}
|
||||
|
||||
func (p *Manager) ProcessValidateConfig(nodeid string, config *app.Config) error {
|
||||
node, err := p.NodeGet(nodeid)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return node.Core().ProcessValidateConfig(config)
|
||||
}
|
||||
|
||||
func (p *Manager) Events(ctx context.Context, filters api.EventFilters) (<-chan api.Event, error) {
|
||||
eventChan := make(chan api.Event, 128)
|
||||
|
||||
|
@@ -12,6 +12,12 @@ func (c *cluster) ProcessAdd(origin string, config *app.Config) error {
|
||||
return c.forwarder.ProcessAdd(origin, config)
|
||||
}
|
||||
|
||||
nodeid := c.manager.GetRandomNode()
|
||||
err := c.manager.ProcessValidateConfig(nodeid, config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cmd := &store.Command{
|
||||
Operation: store.OpAddProcess,
|
||||
Data: &store.CommandAddProcess{
|
||||
@@ -57,6 +63,12 @@ func (c *cluster) ProcessUpdate(origin string, id app.ProcessID, config *app.Con
|
||||
return c.forwarder.ProcessUpdate(origin, id, config)
|
||||
}
|
||||
|
||||
nodeid := c.manager.GetRandomNode()
|
||||
err := c.manager.ProcessValidateConfig(nodeid, config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cmd := &store.Command{
|
||||
Operation: store.OpUpdateProcess,
|
||||
Data: &store.CommandUpdateProcess{
|
||||
|
54
docs/docs.go
54
docs/docs.go
@@ -3597,7 +3597,7 @@ const docTemplate = `{
|
||||
"tags": [
|
||||
"v16.?.?"
|
||||
],
|
||||
"summary": "Add a new process",
|
||||
"summary": "Probe a process config",
|
||||
"operationId": "process-3-probe-config",
|
||||
"parameters": [
|
||||
{
|
||||
@@ -3632,6 +3632,58 @@ const docTemplate = `{
|
||||
}
|
||||
}
|
||||
},
|
||||
"/api/v3/process/validate": {
|
||||
"post": {
|
||||
"security": [
|
||||
{
|
||||
"ApiKeyAuth": []
|
||||
}
|
||||
],
|
||||
"description": "Probe a process to get a detailed stream information on the inputs.",
|
||||
"consumes": [
|
||||
"application/json"
|
||||
],
|
||||
"produces": [
|
||||
"application/json"
|
||||
],
|
||||
"tags": [
|
||||
"v16.?.?"
|
||||
],
|
||||
"summary": "Validate a process config",
|
||||
"operationId": "process-3-validate-config",
|
||||
"parameters": [
|
||||
{
|
||||
"description": "Process config",
|
||||
"name": "config",
|
||||
"in": "body",
|
||||
"required": true,
|
||||
"schema": {
|
||||
"$ref": "#/definitions/api.ProcessConfig"
|
||||
}
|
||||
}
|
||||
],
|
||||
"responses": {
|
||||
"200": {
|
||||
"description": "OK",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/api.Probe"
|
||||
}
|
||||
},
|
||||
"400": {
|
||||
"description": "Bad Request",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/api.Error"
|
||||
}
|
||||
},
|
||||
"403": {
|
||||
"description": "Forbidden",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/api.Error"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"/api/v3/process/{id}": {
|
||||
"get": {
|
||||
"security": [
|
||||
|
@@ -3590,7 +3590,7 @@
|
||||
"tags": [
|
||||
"v16.?.?"
|
||||
],
|
||||
"summary": "Add a new process",
|
||||
"summary": "Probe a process config",
|
||||
"operationId": "process-3-probe-config",
|
||||
"parameters": [
|
||||
{
|
||||
@@ -3625,6 +3625,58 @@
|
||||
}
|
||||
}
|
||||
},
|
||||
"/api/v3/process/validate": {
|
||||
"post": {
|
||||
"security": [
|
||||
{
|
||||
"ApiKeyAuth": []
|
||||
}
|
||||
],
|
||||
"description": "Probe a process to get a detailed stream information on the inputs.",
|
||||
"consumes": [
|
||||
"application/json"
|
||||
],
|
||||
"produces": [
|
||||
"application/json"
|
||||
],
|
||||
"tags": [
|
||||
"v16.?.?"
|
||||
],
|
||||
"summary": "Validate a process config",
|
||||
"operationId": "process-3-validate-config",
|
||||
"parameters": [
|
||||
{
|
||||
"description": "Process config",
|
||||
"name": "config",
|
||||
"in": "body",
|
||||
"required": true,
|
||||
"schema": {
|
||||
"$ref": "#/definitions/api.ProcessConfig"
|
||||
}
|
||||
}
|
||||
],
|
||||
"responses": {
|
||||
"200": {
|
||||
"description": "OK",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/api.Probe"
|
||||
}
|
||||
},
|
||||
"400": {
|
||||
"description": "Bad Request",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/api.Error"
|
||||
}
|
||||
},
|
||||
"403": {
|
||||
"description": "Forbidden",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/api.Error"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"/api/v3/process/{id}": {
|
||||
"get": {
|
||||
"security": [
|
||||
|
@@ -5964,7 +5964,40 @@ paths:
|
||||
$ref: '#/definitions/api.Error'
|
||||
security:
|
||||
- ApiKeyAuth: []
|
||||
summary: Add a new process
|
||||
summary: Probe a process config
|
||||
tags:
|
||||
- v16.?.?
|
||||
/api/v3/process/validate:
|
||||
post:
|
||||
consumes:
|
||||
- application/json
|
||||
description: Probe a process to get a detailed stream information on the inputs.
|
||||
operationId: process-3-validate-config
|
||||
parameters:
|
||||
- description: Process config
|
||||
in: body
|
||||
name: config
|
||||
required: true
|
||||
schema:
|
||||
$ref: '#/definitions/api.ProcessConfig'
|
||||
produces:
|
||||
- application/json
|
||||
responses:
|
||||
"200":
|
||||
description: OK
|
||||
schema:
|
||||
$ref: '#/definitions/api.Probe'
|
||||
"400":
|
||||
description: Bad Request
|
||||
schema:
|
||||
$ref: '#/definitions/api.Error'
|
||||
"403":
|
||||
description: Forbidden
|
||||
schema:
|
||||
$ref: '#/definitions/api.Error'
|
||||
security:
|
||||
- ApiKeyAuth: []
|
||||
summary: Validate a process config
|
||||
tags:
|
||||
- v16.?.?
|
||||
/api/v3/report/process:
|
||||
|
@@ -144,7 +144,7 @@ type ProcessConfigIO struct {
|
||||
ID string `json:"id"`
|
||||
Address string `json:"address" validate:"required" jsonschema:"minLength=1"`
|
||||
Options []string `json:"options"`
|
||||
Cleanup []ProcessConfigIOCleanup `json:"cleanup,omitempty"`
|
||||
Cleanup []ProcessConfigIOCleanup `json:"cleanup"`
|
||||
}
|
||||
|
||||
type ProcessConfigIOCleanup struct {
|
||||
@@ -309,6 +309,7 @@ func (cfg *ProcessConfig) Unmarshal(c *app.Config, metadata map[string]interface
|
||||
io := ProcessConfigIO{
|
||||
ID: x.ID,
|
||||
Address: x.Address,
|
||||
Cleanup: []ProcessConfigIOCleanup{},
|
||||
}
|
||||
|
||||
io.Options = make([]string, len(x.Options))
|
||||
@@ -321,6 +322,7 @@ func (cfg *ProcessConfig) Unmarshal(c *app.Config, metadata map[string]interface
|
||||
io := ProcessConfigIO{
|
||||
ID: x.ID,
|
||||
Address: x.Address,
|
||||
Cleanup: []ProcessConfigIOCleanup{},
|
||||
}
|
||||
|
||||
io.Options = make([]string, len(x.Options))
|
||||
|
@@ -67,6 +67,7 @@ type RestClient interface {
|
||||
ProcessCommand(id app.ProcessID, command string) error // PUT /v3/process/{id}/command
|
||||
ProcessProbe(id app.ProcessID) (api.Probe, error) // GET /v3/process/{id}/probe
|
||||
ProcessProbeConfig(config *app.Config) (api.Probe, error) // POST /v3/process/probe
|
||||
ProcessValidateConfig(p *app.Config) error // POST /v3/process/validate
|
||||
ProcessConfig(id app.ProcessID) (api.ProcessConfig, error) // GET /v3/process/{id}/config
|
||||
ProcessReport(id app.ProcessID) (api.ProcessReport, error) // GET /v3/process/{id}/report
|
||||
ProcessReportSet(id app.ProcessID, report *app.Report) error // PUT /v3/process/{id}/report
|
||||
|
@@ -226,6 +226,24 @@ func (r *restclient) ProcessProbeConfig(p *app.Config) (api.Probe, error) {
|
||||
return probe, err
|
||||
}
|
||||
|
||||
func (r *restclient) ProcessValidateConfig(p *app.Config) error {
|
||||
buf := mem.Get()
|
||||
defer mem.Put(buf)
|
||||
|
||||
config := api.ProcessConfig{}
|
||||
config.Unmarshal(p, nil)
|
||||
|
||||
e := json.NewEncoder(buf)
|
||||
e.Encode(config)
|
||||
|
||||
_, err := r.call("POST", "/v3/process/validate", nil, nil, "application/json", buf.Reader())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *restclient) ProcessConfig(id app.ProcessID) (api.ProcessConfig, error) {
|
||||
var p api.ProcessConfig
|
||||
|
||||
|
@@ -29,4 +29,4 @@
|
||||
"reconnect": true,
|
||||
"reconnect_delay_seconds": 10,
|
||||
"stale_timeout_seconds": 10
|
||||
}
|
||||
}
|
@@ -28,4 +28,4 @@
|
||||
"reconnect": true,
|
||||
"reconnect_delay_seconds": 10,
|
||||
"stale_timeout_seconds": 10
|
||||
}
|
||||
}
|
@@ -29,4 +29,4 @@
|
||||
"reconnect": true,
|
||||
"reconnect_delay_seconds": 10,
|
||||
"stale_timeout_seconds": 10
|
||||
}
|
||||
}
|
@@ -303,6 +303,7 @@ func (h *ProcessHandler) Update(c echo.Context) error {
|
||||
process := api.ProcessConfig{
|
||||
ID: id,
|
||||
Owner: ctxuser,
|
||||
Domain: domain,
|
||||
Type: "ffmpeg",
|
||||
Autostart: true,
|
||||
}
|
||||
@@ -353,7 +354,9 @@ func (h *ProcessHandler) Update(c echo.Context) error {
|
||||
h.restream.SetProcessMetadata(tid, key, data)
|
||||
}
|
||||
|
||||
return c.JSON(http.StatusOK, process)
|
||||
p, _ := h.getProcess(tid, newFilter("config"))
|
||||
|
||||
return c.JSON(http.StatusOK, p.Config)
|
||||
}
|
||||
|
||||
// Command issues a command to a process
|
||||
@@ -751,7 +754,7 @@ func (h *ProcessHandler) Probe(c echo.Context) error {
|
||||
}
|
||||
|
||||
// ProbeConfig probes a process
|
||||
// @Summary Add a new process
|
||||
// @Summary Probe a process config
|
||||
// @Description Probe a process to get a detailed stream information on the inputs.
|
||||
// @Tags v16.?.?
|
||||
// @ID process-3-probe-config
|
||||
@@ -797,6 +800,49 @@ func (h *ProcessHandler) ProbeConfig(c echo.Context) error {
|
||||
return c.JSON(http.StatusOK, apiprobe)
|
||||
}
|
||||
|
||||
// ValaidateConfig validates a config
|
||||
// @Summary Validate a process config
|
||||
// @Description Probe a process to get a detailed stream information on the inputs.
|
||||
// @Tags v16.?.?
|
||||
// @ID process-3-validate-config
|
||||
// @Accept json
|
||||
// @Produce json
|
||||
// @Param config body api.ProcessConfig true "Process config"
|
||||
// @Success 200 {object} api.Probe
|
||||
// @Failure 400 {object} api.Error
|
||||
// @Failure 403 {object} api.Error
|
||||
// @Security ApiKeyAuth
|
||||
// @Router /api/v3/process/validate [post]
|
||||
func (h *ProcessHandler) ValidateConfig(c echo.Context) error {
|
||||
ctxuser := util.DefaultContext(c, "user", "")
|
||||
|
||||
process := api.ProcessConfig{
|
||||
Owner: ctxuser,
|
||||
Type: "ffmpeg",
|
||||
}
|
||||
|
||||
if err := util.ShouldBindJSON(c, &process); err != nil {
|
||||
return api.Err(http.StatusBadRequest, "", "invalid JSON: %s", err.Error())
|
||||
}
|
||||
|
||||
if !h.iam.Enforce(ctxuser, process.Domain, "process", process.ID, "write") {
|
||||
return api.Err(http.StatusForbidden, "", "You are not allowed to validate this process, check the domain and process ID")
|
||||
}
|
||||
|
||||
if process.Type != "ffmpeg" {
|
||||
return api.Err(http.StatusBadRequest, "", "unsupported process type, supported process types are: ffmpeg")
|
||||
}
|
||||
|
||||
config, _ := process.Marshal()
|
||||
|
||||
err := h.restream.Validate(config)
|
||||
if err != nil {
|
||||
return api.Err(http.StatusBadRequest, "", "invalid config: %s", err.Error())
|
||||
}
|
||||
|
||||
return c.JSON(http.StatusOK, process)
|
||||
}
|
||||
|
||||
// Skills returns the detected FFmpeg capabilities
|
||||
// @Summary FFmpeg capabilities
|
||||
// @Description List all detected FFmpeg capabilities.
|
||||
|
@@ -641,6 +641,7 @@ func (s *server) setRoutesV3(v3 *echo.Group) {
|
||||
|
||||
if !s.readOnly {
|
||||
v3.POST("/process/probe", s.v3handler.process.ProbeConfig)
|
||||
v3.POST("/process/validate", s.v3handler.process.ValidateConfig)
|
||||
v3.GET("/process/:id/probe", s.v3handler.process.Probe)
|
||||
v3.POST("/process", s.v3handler.process.Add)
|
||||
v3.PUT("/process/:id", s.v3handler.process.Update)
|
||||
|
@@ -66,6 +66,34 @@ func TestConfigHash(t *testing.T) {
|
||||
require.False(t, bytes.Equal(hash1, hash2))
|
||||
}
|
||||
|
||||
func TestConfigIOHash(t *testing.T) {
|
||||
io1 := ConfigIO{
|
||||
ID: "0",
|
||||
Address: "-",
|
||||
Options: []string{
|
||||
"-codec",
|
||||
"copy",
|
||||
"-f",
|
||||
"null",
|
||||
},
|
||||
Cleanup: []ConfigIOCleanup{},
|
||||
}
|
||||
|
||||
io2 := ConfigIO{
|
||||
ID: "0",
|
||||
Address: "-",
|
||||
Options: []string{
|
||||
"-codec",
|
||||
"copy",
|
||||
"-f",
|
||||
"null",
|
||||
},
|
||||
Cleanup: nil,
|
||||
}
|
||||
|
||||
require.Equal(t, io1.HashString(), io2.HashString())
|
||||
}
|
||||
|
||||
func TestProcessUsageCPU(t *testing.T) {
|
||||
original := parse.UsageCPU{
|
||||
NCPU: 1.5,
|
||||
|
@@ -58,6 +58,7 @@ type Restreamer interface {
|
||||
GetProcessMetadata(id app.ProcessID, key string) (interface{}, error) // Get previously set metadata from a process
|
||||
|
||||
Probe(config *app.Config, timeout time.Duration) app.Probe // Probe a process with specific timeout
|
||||
Validate(config *app.Config) error // Validate a process config
|
||||
}
|
||||
|
||||
// Config is the required configuration for a new restreamer instance.
|
||||
@@ -1920,3 +1921,27 @@ func hasPlaceholder(config *app.Config, r replace.Replacer, placeholder string)
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func (r *restream) Validate(config *app.Config) error {
|
||||
cfg := config.Clone()
|
||||
|
||||
resolveStaticPlaceholders(cfg, r.replace)
|
||||
|
||||
err := r.resolveAddresses(r.tasks, cfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
resolveDynamicPlaceholder(cfg, r.replace, map[string]string{
|
||||
"hwdevice": "0",
|
||||
}, map[string]string{
|
||||
"timestamp": time.Now().UTC().Format(time.RFC3339),
|
||||
})
|
||||
|
||||
_, err = validateConfig(cfg, r.fs.list, r.ffmpeg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
Reference in New Issue
Block a user