Various updates

- rebrand group to domain
- move IAM to the API (rest and graph) for enforcing "process:" rules
- add abstraction layer for restream store in order to decouple internal format from format on disk
- move playout handler into restreamHandler
- remove user from restream interface
- add TaskID type that includes the process id and its domain
This commit is contained in:
Ingo Oppermann
2023-05-23 15:47:06 +02:00
parent 6f831fd190
commit ccac2ffd5d
36 changed files with 1822 additions and 1001 deletions

View File

@@ -36,6 +36,7 @@ import (
"github.com/datarhei/core/v16/restream/replace"
"github.com/datarhei/core/v16/restream/rewrite"
restreamstore "github.com/datarhei/core/v16/restream/store"
restreamjsonstore "github.com/datarhei/core/v16/restream/store/json"
"github.com/datarhei/core/v16/rtmp"
"github.com/datarhei/core/v16/service"
"github.com/datarhei/core/v16/session"
@@ -718,7 +719,7 @@ func (a *api) start() error {
if err != nil {
return err
}
store, err = restreamstore.NewJSON(restreamstore.JSONConfig{
store, err = restreamjsonstore.New(restreamjsonstore.Config{
Filesystem: fs,
Filepath: "/db.json",
Logger: a.log.logger.core.WithComponent("ProcessStore"),

View File

@@ -11,7 +11,7 @@ import (
"github.com/datarhei/core/v16/io/file"
"github.com/datarhei/core/v16/io/fs"
"github.com/datarhei/core/v16/log"
"github.com/datarhei/core/v16/restream/store"
jsonstore "github.com/datarhei/core/v16/restream/store/json"
"github.com/Masterminds/semver/v3"
_ "github.com/joho/godotenv/autoload"
@@ -120,7 +120,7 @@ func doMigration(logger log.Logger, configstore cfgstore.Store) error {
logger.Info().WithField("backup", backupFilepath).Log("Backup created")
// Load the existing DB
datastore, err := store.NewJSON(store.JSONConfig{
datastore, err := jsonstore.New(jsonstore.Config{
Filepath: cfg.DB.Dir + "/db.json",
})
if err != nil {
@@ -135,31 +135,33 @@ func doMigration(logger log.Logger, configstore cfgstore.Store) error {
logger.Info().Log("Migrating processes ...")
// Migrate the processes to version 5
// Migrate the processes to FFmpeg version 5
// Only this happens:
// - for RTSP inputs, replace -stimeout with -timeout
reRTSP := regexp.MustCompile(`^rtsps?://`)
for id, p := range data.Process {
logger.Info().WithField("processid", p.ID).Log("")
for name, domain := range data.Process {
for id, p := range domain {
logger.Info().WithField("processid", p.Process.ID).Log("")
for index, input := range p.Config.Input {
if !reRTSP.MatchString(input.Address) {
continue
}
for i, o := range input.Options {
if o != "-stimeout" {
for index, input := range p.Process.Config.Input {
if !reRTSP.MatchString(input.Address) {
continue
}
input.Options[i] = "-timeout"
}
for i, o := range input.Options {
if o != "-stimeout" {
continue
}
p.Config.Input[index] = input
input.Options[i] = "-timeout"
}
p.Process.Config.Input[index] = input
}
p.Process.Config.FFVersion = version.String()
data.Process[name][id] = p
}
p.Config.FFVersion = version.String()
data.Process[id] = p
}
logger.Info().Log("Migrating processes done")

View File

@@ -22,6 +22,7 @@ import (
"github.com/datarhei/core/v16/restream"
"github.com/datarhei/core/v16/restream/app"
"github.com/datarhei/core/v16/restream/store"
jsonstore "github.com/datarhei/core/v16/restream/store/json"
"github.com/google/uuid"
)
@@ -497,13 +498,12 @@ type importConfigAudio struct {
sampling string
}
func importV1(fs fs.Filesystem, path string, cfg importConfig) (store.StoreData, error) {
func importV1(fs fs.Filesystem, path string, cfg importConfig) (store.Data, error) {
if len(cfg.id) == 0 {
cfg.id = uuid.New().String()
}
r := store.NewStoreData()
r.Version = 4
r := store.NewData()
jsondata, err := fs.ReadFile(path)
if err != nil {
@@ -1189,10 +1189,20 @@ func importV1(fs fs.Filesystem, path string, cfg importConfig) (store.StoreData,
config.Output = append(config.Output, output)
process.Config = config
r.Process[process.ID] = process
r.Metadata.Process["restreamer-ui:ingest:"+cfg.id] = make(map[string]interface{})
r.Metadata.Process["restreamer-ui:ingest:"+cfg.id]["restreamer-ui"] = ui
p := store.Process{
Process: process,
Metadata: map[string]interface{}{},
}
if metadata, err := gojson.Marshal(ui); err == nil {
m := map[string]interface{}{}
gojson.Unmarshal(metadata, &m)
p.Metadata["restreamer-ui"] = m
}
r.Process[""] = map[string]store.Process{}
r.Process[""][process.ID] = p
// Snapshot
@@ -1242,9 +1252,13 @@ func importV1(fs fs.Filesystem, path string, cfg importConfig) (store.StoreData,
snapshotConfig.Output = append(snapshotConfig.Output, snapshotOutput)
snapshotProcess.Config = snapshotConfig
r.Process[snapshotProcess.ID] = snapshotProcess
r.Metadata.Process["restreamer-ui:ingest:"+cfg.id+"_snapshot"] = nil
p := store.Process{
Process: snapshotProcess,
Metadata: nil,
}
r.Process[""][snapshotProcess.ID] = p
}
// Optional publication
@@ -1403,10 +1417,19 @@ func importV1(fs fs.Filesystem, path string, cfg importConfig) (store.StoreData,
config.Output = append(config.Output, output)
process.Config = config
r.Process[process.ID] = process
r.Metadata.Process[egressId] = make(map[string]interface{})
r.Metadata.Process[egressId]["restreamer-ui"] = egress
p := store.Process{
Process: process,
Metadata: map[string]interface{}{},
}
if metadata, err := gojson.Marshal(egress); err == nil {
m := map[string]interface{}{}
gojson.Unmarshal(metadata, &m)
p.Metadata["restreamer-ui"] = m
}
r.Process[""][process.ID] = p
}
return r, nil
@@ -1421,7 +1444,7 @@ func probeInput(binary string, config app.Config) app.Probe {
}
dummyfs, _ := fs.NewMemFilesystem(fs.MemConfig{})
store, err := store.NewJSON(store.JSONConfig{
store, err := jsonstore.New(jsonstore.Config{
Filesystem: dummyfs,
Filepath: "/",
Logger: nil,
@@ -1452,8 +1475,10 @@ func probeInput(binary string, config app.Config) app.Probe {
}
rs.AddProcess(&config)
probe := rs.Probe(config.ID, "", "")
rs.DeleteProcess(config.ID, "", "")
id := restream.TaskID{ID: config.ID}
probe := rs.Probe(id)
rs.DeleteProcess(id)
return probe
}

View File

@@ -1,13 +1,11 @@
package main
import (
gojson "encoding/json"
"os"
"testing"
"github.com/datarhei/core/v16/encoding/json"
"github.com/datarhei/core/v16/io/fs"
"github.com/datarhei/core/v16/restream/store"
jsonstore "github.com/datarhei/core/v16/restream/store/json"
"github.com/stretchr/testify/require"
)
@@ -42,37 +40,28 @@ func testV1Import(t *testing.T, v1Fixture, v4Fixture string, config importConfig
})
require.NoError(t, err)
store, err := jsonstore.New(jsonstore.Config{
Filesystem: diskfs,
Filepath: v4Fixture,
})
require.NoError(t, err)
// Import v1 database
v4, err := importV1(diskfs, v1Fixture, config)
require.Equal(t, nil, err)
require.NoError(t, err)
// Reset variants
for n := range v4.Process {
v4.Process[n].CreatedAt = 0
for m, domain := range v4.Process {
for n := range domain {
v4.Process[m][n].Process.CreatedAt = 0
}
}
// Convert to JSON
datav4, err := gojson.MarshalIndent(&v4, "", " ")
require.Equal(t, nil, err)
// Read the wanted result
wantdatav4, err := diskfs.ReadFile(v4Fixture)
require.Equal(t, nil, err)
wantv4, err := store.Load()
require.NoError(t, err)
var wantv4 store.StoreData
err = gojson.Unmarshal(wantdatav4, &wantv4)
require.Equal(t, nil, err, json.FormatError(wantdatav4, err))
// Convert to JSON
wantdatav4, err = gojson.MarshalIndent(&wantv4, "", " ")
require.Equal(t, nil, err)
// Re-convert both to golang type
gojson.Unmarshal(wantdatav4, &wantv4)
gojson.Unmarshal(datav4, &v4)
require.Equal(t, wantv4, v4)
require.Equal(t, wantv4, v4, v4Fixture)
}
func TestV1Import(t *testing.T) {

View File

@@ -8,7 +8,7 @@ import (
cfgvars "github.com/datarhei/core/v16/config/vars"
"github.com/datarhei/core/v16/io/fs"
"github.com/datarhei/core/v16/log"
"github.com/datarhei/core/v16/restream/store"
jsonstore "github.com/datarhei/core/v16/restream/store/json"
_ "github.com/joho/godotenv/autoload"
)
@@ -87,7 +87,7 @@ func doImport(logger log.Logger, fs fs.Filesystem, configstore cfgstore.Store) e
logger.Info().Log("Found database")
// Load an existing DB
datastore, err := store.NewJSON(store.JSONConfig{
datastore, err := jsonstore.New(jsonstore.Config{
Filesystem: fs,
Filepath: cfg.DB.Dir + "/db.json",
})

View File

@@ -992,6 +992,12 @@ const docTemplate = `{
"schema": {
"$ref": "#/definitions/api.IAMUser"
}
},
{
"type": "string",
"description": "Domain of the acting user",
"name": "domain",
"in": "query"
}
],
"responses": {
@@ -1039,6 +1045,12 @@ const docTemplate = `{
"name": "name",
"in": "path",
"required": true
},
{
"type": "string",
"description": "Domain of the acting user",
"name": "domain",
"in": "query"
}
],
"responses": {
@@ -1082,6 +1094,12 @@ const docTemplate = `{
"in": "path",
"required": true
},
{
"type": "string",
"description": "Domain of the acting user",
"name": "domain",
"in": "query"
},
{
"description": "User definition",
"name": "user",
@@ -1141,6 +1159,12 @@ const docTemplate = `{
"name": "name",
"in": "path",
"required": true
},
{
"type": "string",
"description": "Domain of the acting user",
"name": "domain",
"in": "query"
}
],
"responses": {
@@ -1165,6 +1189,83 @@ const docTemplate = `{
}
}
},
"/api/v3/iam/user/{name}/policy": {
"put": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "Replace policies of an user",
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"tags": [
"v16.?.?"
],
"summary": "Replace policies of an user",
"operationId": "iam-3-update-user",
"parameters": [
{
"type": "string",
"description": "Username",
"name": "name",
"in": "path",
"required": true
},
{
"type": "string",
"description": "Domain of the acting user",
"name": "domain",
"in": "query"
},
{
"description": "Policy definitions",
"name": "user",
"in": "body",
"required": true,
"schema": {
"type": "array",
"items": {
"$ref": "#/definitions/api.IAMPolicy"
}
}
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "array",
"items": {
"$ref": "#/definitions/api.IAMPolicy"
}
}
},
"400": {
"description": "Bad Request",
"schema": {
"$ref": "#/definitions/api.Error"
}
},
"404": {
"description": "Not Found",
"schema": {
"$ref": "#/definitions/api.Error"
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/api.Error"
}
}
}
}
},
"/api/v3/log": {
"get": {
"security": [
@@ -3363,7 +3464,7 @@ const docTemplate = `{
"type": "string"
}
},
"group": {
"domain": {
"type": "string"
},
"resource": {

View File

@@ -985,6 +985,12 @@
"schema": {
"$ref": "#/definitions/api.IAMUser"
}
},
{
"type": "string",
"description": "Domain of the acting user",
"name": "domain",
"in": "query"
}
],
"responses": {
@@ -1032,6 +1038,12 @@
"name": "name",
"in": "path",
"required": true
},
{
"type": "string",
"description": "Domain of the acting user",
"name": "domain",
"in": "query"
}
],
"responses": {
@@ -1075,6 +1087,12 @@
"in": "path",
"required": true
},
{
"type": "string",
"description": "Domain of the acting user",
"name": "domain",
"in": "query"
},
{
"description": "User definition",
"name": "user",
@@ -1134,6 +1152,12 @@
"name": "name",
"in": "path",
"required": true
},
{
"type": "string",
"description": "Domain of the acting user",
"name": "domain",
"in": "query"
}
],
"responses": {
@@ -1158,6 +1182,83 @@
}
}
},
"/api/v3/iam/user/{name}/policy": {
"put": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "Replace policies of an user",
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"tags": [
"v16.?.?"
],
"summary": "Replace policies of an user",
"operationId": "iam-3-update-user",
"parameters": [
{
"type": "string",
"description": "Username",
"name": "name",
"in": "path",
"required": true
},
{
"type": "string",
"description": "Domain of the acting user",
"name": "domain",
"in": "query"
},
{
"description": "Policy definitions",
"name": "user",
"in": "body",
"required": true,
"schema": {
"type": "array",
"items": {
"$ref": "#/definitions/api.IAMPolicy"
}
}
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "array",
"items": {
"$ref": "#/definitions/api.IAMPolicy"
}
}
},
"400": {
"description": "Bad Request",
"schema": {
"$ref": "#/definitions/api.Error"
}
},
"404": {
"description": "Not Found",
"schema": {
"$ref": "#/definitions/api.Error"
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/api.Error"
}
}
}
}
},
"/api/v3/log": {
"get": {
"security": [
@@ -3356,7 +3457,7 @@
"type": "string"
}
},
"group": {
"domain": {
"type": "string"
},
"resource": {

View File

@@ -507,7 +507,7 @@ definitions:
items:
type: string
type: array
group:
domain:
type: string
resource:
type: string
@@ -2646,6 +2646,10 @@ paths:
required: true
schema:
$ref: '#/definitions/api.IAMUser'
- description: Domain of the acting user
in: query
name: domain
type: string
produces:
- application/json
responses:
@@ -2676,6 +2680,10 @@ paths:
name: name
required: true
type: string
- description: Domain of the acting user
in: query
name: domain
type: string
produces:
- application/json
responses:
@@ -2705,6 +2713,10 @@ paths:
name: name
required: true
type: string
- description: Domain of the acting user
in: query
name: domain
type: string
produces:
- application/json
responses:
@@ -2732,6 +2744,10 @@ paths:
name: name
required: true
type: string
- description: Domain of the acting user
in: query
name: domain
type: string
- description: User definition
in: body
name: user
@@ -2762,6 +2778,56 @@ paths:
summary: Replace an existing user
tags:
- v16.?.?
/api/v3/iam/user/{name}/policy:
put:
consumes:
- application/json
description: Replace policies of an user
operationId: iam-3-update-user
parameters:
- description: Username
in: path
name: name
required: true
type: string
- description: Domain of the acting user
in: query
name: domain
type: string
- description: Policy definitions
in: body
name: user
required: true
schema:
items:
$ref: '#/definitions/api.IAMPolicy'
type: array
produces:
- application/json
responses:
"200":
description: OK
schema:
items:
$ref: '#/definitions/api.IAMPolicy'
type: array
"400":
description: Bad Request
schema:
$ref: '#/definitions/api.Error'
"404":
description: Not Found
schema:
$ref: '#/definitions/api.Error'
"500":
description: Internal Server Error
schema:
$ref: '#/definitions/api.Error'
security:
- ApiKeyAuth: []
summary: Replace policies of an user
tags:
- v16.?.?
/api/v3/log:
get:
description: Get the last log lines of the Restreamer application

View File

@@ -62,7 +62,7 @@ type ProcessConfig struct {
func (cfg *ProcessConfig) Marshal() *app.Config {
p := &app.Config{
ID: cfg.ID,
Group: cfg.Group,
Domain: cfg.Group,
Reference: cfg.Reference,
Options: cfg.Options,
Reconnect: cfg.Reconnect,
@@ -142,7 +142,7 @@ func (cfg *ProcessConfig) Unmarshal(c *app.Config) {
}
cfg.ID = c.ID
cfg.Group = c.Group
cfg.Group = c.Domain
cfg.Reference = c.Reference
cfg.Type = "ffmpeg"
cfg.Reconnect = c.Reconnect

View File

@@ -127,7 +127,7 @@ type ComplexityRoot struct {
Process struct {
Config func(childComplexity int) int
CreatedAt func(childComplexity int) int
Group func(childComplexity int) int
Domain func(childComplexity int) int
ID func(childComplexity int) int
Metadata func(childComplexity int) int
Owner func(childComplexity int) int
@@ -139,7 +139,7 @@ type ComplexityRoot struct {
ProcessConfig struct {
Autostart func(childComplexity int) int
Group func(childComplexity int) int
Domain func(childComplexity int) int
ID func(childComplexity int) int
Input func(childComplexity int) int
Limits func(childComplexity int) int
@@ -240,10 +240,10 @@ type ComplexityRoot struct {
Log func(childComplexity int) int
Metrics func(childComplexity int, query models.MetricsInput) int
Ping func(childComplexity int) int
PlayoutStatus func(childComplexity int, id string, group *string, input string) int
Probe func(childComplexity int, id string, group *string) int
Process func(childComplexity int, id string, group *string) int
Processes func(childComplexity int, idpattern *string, refpattern *string, group *string) int
PlayoutStatus func(childComplexity int, id string, domain string, input string) int
Probe func(childComplexity int, id string, domain string) int
Process func(childComplexity int, id string, domain string) int
Processes func(childComplexity int, idpattern *string, refpattern *string, domainpattern *string) int
}
RawAVstream struct {
@@ -287,10 +287,10 @@ type QueryResolver interface {
About(ctx context.Context) (*models.About, error)
Log(ctx context.Context) ([]string, error)
Metrics(ctx context.Context, query models.MetricsInput) (*models.Metrics, error)
PlayoutStatus(ctx context.Context, id string, group *string, input string) (*models.RawAVstream, error)
Processes(ctx context.Context, idpattern *string, refpattern *string, group *string) ([]*models.Process, error)
Process(ctx context.Context, id string, group *string) (*models.Process, error)
Probe(ctx context.Context, id string, group *string) (*models.Probe, error)
PlayoutStatus(ctx context.Context, id string, domain string, input string) (*models.RawAVstream, error)
Processes(ctx context.Context, idpattern *string, refpattern *string, domainpattern *string) ([]*models.Process, error)
Process(ctx context.Context, id string, domain string) (*models.Process, error)
Probe(ctx context.Context, id string, domain string) (*models.Probe, error)
}
type executableSchema struct {
@@ -679,12 +679,12 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in
return e.complexity.Process.CreatedAt(childComplexity), true
case "Process.group":
if e.complexity.Process.Group == nil {
case "Process.domain":
if e.complexity.Process.Domain == nil {
break
}
return e.complexity.Process.Group(childComplexity), true
return e.complexity.Process.Domain(childComplexity), true
case "Process.id":
if e.complexity.Process.ID == nil {
@@ -742,12 +742,12 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in
return e.complexity.ProcessConfig.Autostart(childComplexity), true
case "ProcessConfig.group":
if e.complexity.ProcessConfig.Group == nil {
case "ProcessConfig.domain":
if e.complexity.ProcessConfig.Domain == nil {
break
}
return e.complexity.ProcessConfig.Group(childComplexity), true
return e.complexity.ProcessConfig.Domain(childComplexity), true
case "ProcessConfig.id":
if e.complexity.ProcessConfig.ID == nil {
@@ -1275,7 +1275,7 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in
return 0, false
}
return e.complexity.Query.PlayoutStatus(childComplexity, args["id"].(string), args["group"].(*string), args["input"].(string)), true
return e.complexity.Query.PlayoutStatus(childComplexity, args["id"].(string), args["domain"].(string), args["input"].(string)), true
case "Query.probe":
if e.complexity.Query.Probe == nil {
@@ -1287,7 +1287,7 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in
return 0, false
}
return e.complexity.Query.Probe(childComplexity, args["id"].(string), args["group"].(*string)), true
return e.complexity.Query.Probe(childComplexity, args["id"].(string), args["domain"].(string)), true
case "Query.process":
if e.complexity.Query.Process == nil {
@@ -1299,7 +1299,7 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in
return 0, false
}
return e.complexity.Query.Process(childComplexity, args["id"].(string), args["group"].(*string)), true
return e.complexity.Query.Process(childComplexity, args["id"].(string), args["domain"].(string)), true
case "Query.processes":
if e.complexity.Query.Processes == nil {
@@ -1311,7 +1311,7 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in
return 0, false
}
return e.complexity.Query.Processes(childComplexity, args["idpattern"].(*string), args["refpattern"].(*string), args["group"].(*string)), true
return e.complexity.Query.Processes(childComplexity, args["idpattern"].(*string), args["refpattern"].(*string), args["domainpattern"].(*string)), true
case "RawAVstream.aqueue":
if e.complexity.RawAVstream.Aqueue == nil {
@@ -1598,7 +1598,7 @@ type Metric {
}
`, BuiltIn: false},
{Name: "../playout.graphqls", Input: `extend type Query {
playoutStatus(id: ID!, group: String, input: ID!): RawAVstream
playoutStatus(id: ID!, domain: String!, input: ID!): RawAVstream
}
type RawAVstreamIO {
@@ -1634,9 +1634,13 @@ type RawAVstream {
}
`, BuiltIn: false},
{Name: "../process.graphqls", Input: `extend type Query {
processes(idpattern: String, refpattern: String, group: String): [Process!]!
process(id: ID!, group: String): Process
probe(id: ID!, group: String): Probe!
processes(
idpattern: String
refpattern: String
domainpattern: String
): [Process!]!
process(id: ID!, domain: String!): Process
probe(id: ID!, domain: String!): Probe!
}
type ProcessConfigIO {
@@ -1654,7 +1658,7 @@ type ProcessConfigLimits {
type ProcessConfig {
id: String!
owner: String!
group: String!
domain: String!
type: String!
reference: String!
input: [ProcessConfigIO!]!
@@ -1706,7 +1710,7 @@ type ProcessReport implements IProcessReportHistoryEntry {
type Process {
id: String!
owner: String!
group: String!
domain: String!
type: String!
reference: String!
created_at: Time!
@@ -1881,15 +1885,15 @@ func (ec *executionContext) field_Query_playoutStatus_args(ctx context.Context,
}
}
args["id"] = arg0
var arg1 *string
if tmp, ok := rawArgs["group"]; ok {
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("group"))
arg1, err = ec.unmarshalOString2string(ctx, tmp)
var arg1 string
if tmp, ok := rawArgs["domain"]; ok {
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("domain"))
arg1, err = ec.unmarshalNString2string(ctx, tmp)
if err != nil {
return nil, err
}
}
args["group"] = arg1
args["domain"] = arg1
var arg2 string
if tmp, ok := rawArgs["input"]; ok {
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("input"))
@@ -1914,15 +1918,15 @@ func (ec *executionContext) field_Query_probe_args(ctx context.Context, rawArgs
}
}
args["id"] = arg0
var arg1 *string
if tmp, ok := rawArgs["group"]; ok {
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("group"))
arg1, err = ec.unmarshalOString2string(ctx, tmp)
var arg1 string
if tmp, ok := rawArgs["domain"]; ok {
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("domain"))
arg1, err = ec.unmarshalNString2string(ctx, tmp)
if err != nil {
return nil, err
}
}
args["group"] = arg1
args["domain"] = arg1
return args, nil
}
@@ -1938,15 +1942,15 @@ func (ec *executionContext) field_Query_process_args(ctx context.Context, rawArg
}
}
args["id"] = arg0
var arg1 *string
if tmp, ok := rawArgs["group"]; ok {
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("group"))
arg1, err = ec.unmarshalOString2string(ctx, tmp)
var arg1 string
if tmp, ok := rawArgs["domain"]; ok {
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("domain"))
arg1, err = ec.unmarshalNString2string(ctx, tmp)
if err != nil {
return nil, err
}
}
args["group"] = arg1
args["domain"] = arg1
return args, nil
}
@@ -1972,14 +1976,14 @@ func (ec *executionContext) field_Query_processes_args(ctx context.Context, rawA
}
args["refpattern"] = arg1
var arg2 *string
if tmp, ok := rawArgs["group"]; ok {
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("group"))
if tmp, ok := rawArgs["domainpattern"]; ok {
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("domainpattern"))
arg2, err = ec.unmarshalOString2ᚖstring(ctx, tmp)
if err != nil {
return nil, err
}
}
args["group"] = arg2
args["domainpattern"] = arg2
return args, nil
}
@@ -4420,8 +4424,8 @@ func (ec *executionContext) fieldContext_Process_owner(ctx context.Context, fiel
return fc, nil
}
func (ec *executionContext) _Process_group(ctx context.Context, field graphql.CollectedField, obj *models.Process) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_Process_group(ctx, field)
func (ec *executionContext) _Process_domain(ctx context.Context, field graphql.CollectedField, obj *models.Process) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_Process_domain(ctx, field)
if err != nil {
return graphql.Null
}
@@ -4434,7 +4438,7 @@ func (ec *executionContext) _Process_group(ctx context.Context, field graphql.Co
}()
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
ctx = rctx // use context from middleware stack in children
return obj.Group, nil
return obj.Domain, nil
})
if err != nil {
ec.Error(ctx, err)
@@ -4451,7 +4455,7 @@ func (ec *executionContext) _Process_group(ctx context.Context, field graphql.Co
return ec.marshalNString2string(ctx, field.Selections, res)
}
func (ec *executionContext) fieldContext_Process_group(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
func (ec *executionContext) fieldContext_Process_domain(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
fc = &graphql.FieldContext{
Object: "Process",
Field: field,
@@ -4639,8 +4643,8 @@ func (ec *executionContext) fieldContext_Process_config(ctx context.Context, fie
return ec.fieldContext_ProcessConfig_id(ctx, field)
case "owner":
return ec.fieldContext_ProcessConfig_owner(ctx, field)
case "group":
return ec.fieldContext_ProcessConfig_group(ctx, field)
case "domain":
return ec.fieldContext_ProcessConfig_domain(ctx, field)
case "type":
return ec.fieldContext_ProcessConfig_type(ctx, field)
case "reference":
@@ -4915,8 +4919,8 @@ func (ec *executionContext) fieldContext_ProcessConfig_owner(ctx context.Context
return fc, nil
}
func (ec *executionContext) _ProcessConfig_group(ctx context.Context, field graphql.CollectedField, obj *models.ProcessConfig) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_ProcessConfig_group(ctx, field)
func (ec *executionContext) _ProcessConfig_domain(ctx context.Context, field graphql.CollectedField, obj *models.ProcessConfig) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_ProcessConfig_domain(ctx, field)
if err != nil {
return graphql.Null
}
@@ -4929,7 +4933,7 @@ func (ec *executionContext) _ProcessConfig_group(ctx context.Context, field grap
}()
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
ctx = rctx // use context from middleware stack in children
return obj.Group, nil
return obj.Domain, nil
})
if err != nil {
ec.Error(ctx, err)
@@ -4946,7 +4950,7 @@ func (ec *executionContext) _ProcessConfig_group(ctx context.Context, field grap
return ec.marshalNString2string(ctx, field.Selections, res)
}
func (ec *executionContext) fieldContext_ProcessConfig_group(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
func (ec *executionContext) fieldContext_ProcessConfig_domain(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
fc = &graphql.FieldContext{
Object: "ProcessConfig",
Field: field,
@@ -8352,7 +8356,7 @@ func (ec *executionContext) _Query_playoutStatus(ctx context.Context, field grap
}()
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
ctx = rctx // use context from middleware stack in children
return ec.resolvers.Query().PlayoutStatus(rctx, fc.Args["id"].(string), fc.Args["group"].(*string), fc.Args["input"].(string))
return ec.resolvers.Query().PlayoutStatus(rctx, fc.Args["id"].(string), fc.Args["domain"].(string), fc.Args["input"].(string))
})
if err != nil {
ec.Error(ctx, err)
@@ -8436,7 +8440,7 @@ func (ec *executionContext) _Query_processes(ctx context.Context, field graphql.
}()
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
ctx = rctx // use context from middleware stack in children
return ec.resolvers.Query().Processes(rctx, fc.Args["idpattern"].(*string), fc.Args["refpattern"].(*string), fc.Args["group"].(*string))
return ec.resolvers.Query().Processes(rctx, fc.Args["idpattern"].(*string), fc.Args["refpattern"].(*string), fc.Args["domainpattern"].(*string))
})
if err != nil {
ec.Error(ctx, err)
@@ -8465,8 +8469,8 @@ func (ec *executionContext) fieldContext_Query_processes(ctx context.Context, fi
return ec.fieldContext_Process_id(ctx, field)
case "owner":
return ec.fieldContext_Process_owner(ctx, field)
case "group":
return ec.fieldContext_Process_group(ctx, field)
case "domain":
return ec.fieldContext_Process_domain(ctx, field)
case "type":
return ec.fieldContext_Process_type(ctx, field)
case "reference":
@@ -8513,7 +8517,7 @@ func (ec *executionContext) _Query_process(ctx context.Context, field graphql.Co
}()
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
ctx = rctx // use context from middleware stack in children
return ec.resolvers.Query().Process(rctx, fc.Args["id"].(string), fc.Args["group"].(*string))
return ec.resolvers.Query().Process(rctx, fc.Args["id"].(string), fc.Args["domain"].(string))
})
if err != nil {
ec.Error(ctx, err)
@@ -8539,8 +8543,8 @@ func (ec *executionContext) fieldContext_Query_process(ctx context.Context, fiel
return ec.fieldContext_Process_id(ctx, field)
case "owner":
return ec.fieldContext_Process_owner(ctx, field)
case "group":
return ec.fieldContext_Process_group(ctx, field)
case "domain":
return ec.fieldContext_Process_domain(ctx, field)
case "type":
return ec.fieldContext_Process_type(ctx, field)
case "reference":
@@ -8587,7 +8591,7 @@ func (ec *executionContext) _Query_probe(ctx context.Context, field graphql.Coll
}()
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
ctx = rctx // use context from middleware stack in children
return ec.resolvers.Query().Probe(rctx, fc.Args["id"].(string), fc.Args["group"].(*string))
return ec.resolvers.Query().Probe(rctx, fc.Args["id"].(string), fc.Args["domain"].(string))
})
if err != nil {
ec.Error(ctx, err)
@@ -12258,9 +12262,9 @@ func (ec *executionContext) _Process(ctx context.Context, sel ast.SelectionSet,
if out.Values[i] == graphql.Null {
invalids++
}
case "group":
case "domain":
out.Values[i] = ec._Process_group(ctx, field, obj)
out.Values[i] = ec._Process_domain(ctx, field, obj)
if out.Values[i] == graphql.Null {
invalids++
@@ -12346,9 +12350,9 @@ func (ec *executionContext) _ProcessConfig(ctx context.Context, sel ast.Selectio
if out.Values[i] == graphql.Null {
invalids++
}
case "group":
case "domain":
out.Values[i] = ec._ProcessConfig_group(ctx, field, obj)
out.Values[i] = ec._ProcessConfig_domain(ctx, field, obj)
if out.Values[i] == graphql.Null {
invalids++

View File

@@ -106,7 +106,7 @@ type ProbeIo struct {
type Process struct {
ID string `json:"id"`
Owner string `json:"owner"`
Group string `json:"group"`
Domain string `json:"domain"`
Type string `json:"type"`
Reference string `json:"reference"`
CreatedAt time.Time `json:"created_at"`
@@ -119,7 +119,7 @@ type Process struct {
type ProcessConfig struct {
ID string `json:"id"`
Owner string `json:"owner"`
Group string `json:"group"`
Domain string `json:"domain"`
Type string `json:"type"`
Reference string `json:"reference"`
Input []*ProcessConfigIo `json:"input"`

View File

@@ -1,5 +1,5 @@
extend type Query {
playoutStatus(id: ID!, group: String, input: ID!): RawAVstream
playoutStatus(id: ID!, domain: String!, input: ID!): RawAVstream
}
type RawAVstreamIO {

View File

@@ -1,7 +1,11 @@
extend type Query {
processes(idpattern: String, refpattern: String, group: String): [Process!]!
process(id: ID!, group: String): Process
probe(id: ID!, group: String): Probe!
processes(
idpattern: String
refpattern: String
domainpattern: String
): [Process!]!
process(id: ID!, domain: String!): Process
probe(id: ID!, domain: String!): Probe!
}
type ProcessConfigIO {
@@ -19,7 +23,7 @@ type ProcessConfigLimits {
type ProcessConfig {
id: String!
owner: String!
group: String!
domain: String!
type: String!
reference: String!
input: [ProcessConfigIO!]!
@@ -71,7 +75,7 @@ type ProcessReport implements IProcessReportHistoryEntry {
type Process {
id: String!
owner: String!
group: String!
domain: String!
type: String!
reference: String!
created_at: Time!

View File

@@ -11,13 +11,23 @@ import (
"github.com/datarhei/core/v16/http/graph/models"
"github.com/datarhei/core/v16/playout"
"github.com/datarhei/core/v16/restream"
)
// PlayoutStatus is the resolver for the playoutStatus field.
func (r *queryResolver) PlayoutStatus(ctx context.Context, id string, group *string, input string) (*models.RawAVstream, error) {
func (r *queryResolver) PlayoutStatus(ctx context.Context, id string, domain string, input string) (*models.RawAVstream, error) {
user, _ := ctx.Value("user").(string)
addr, err := r.Restream.GetPlayout(id, user, *group, input)
if !r.IAM.Enforce(user, domain, "process:"+id, "read") {
return nil, fmt.Errorf("forbidden")
}
tid := restream.TaskID{
ID: id,
Domain: domain,
}
addr, err := r.Restream.GetPlayout(tid, input)
if err != nil {
return nil, fmt.Errorf("unknown process or input: %w", err)
}

View File

@@ -5,19 +5,25 @@ package resolver
import (
"context"
"fmt"
"github.com/datarhei/core/v16/http/graph/models"
"github.com/datarhei/core/v16/restream"
)
// Processes is the resolver for the processes field.
func (r *queryResolver) Processes(ctx context.Context, idpattern *string, refpattern *string, group *string) ([]*models.Process, error) {
func (r *queryResolver) Processes(ctx context.Context, idpattern *string, refpattern *string, domainpattern *string) ([]*models.Process, error) {
user, _ := ctx.Value(GraphKey("user")).(string)
ids := r.Restream.GetProcessIDs(*idpattern, *refpattern, user, *group)
ids := r.Restream.GetProcessIDs(*idpattern, *refpattern, "", *domainpattern)
procs := []*models.Process{}
for _, id := range ids {
p, err := r.getProcess(id, user, *group)
if !r.IAM.Enforce(user, id.Domain, "process:"+id.ID, "read") {
continue
}
p, err := r.getProcess(id)
if err != nil {
return nil, err
}
@@ -29,17 +35,35 @@ func (r *queryResolver) Processes(ctx context.Context, idpattern *string, refpat
}
// Process is the resolver for the process field.
func (r *queryResolver) Process(ctx context.Context, id string, group *string) (*models.Process, error) {
func (r *queryResolver) Process(ctx context.Context, id string, domain string) (*models.Process, error) {
user, _ := ctx.Value(GraphKey("user")).(string)
return r.getProcess(id, user, *group)
if !r.IAM.Enforce(user, domain, "process:"+id, "read") {
return nil, fmt.Errorf("forbidden")
}
tid := restream.TaskID{
ID: id,
Domain: domain,
}
return r.getProcess(tid)
}
// Probe is the resolver for the probe field.
func (r *queryResolver) Probe(ctx context.Context, id string, group *string) (*models.Probe, error) {
func (r *queryResolver) Probe(ctx context.Context, id string, domain string) (*models.Probe, error) {
user, _ := ctx.Value(GraphKey("user")).(string)
probe := r.Restream.Probe(id, user, *group)
if !r.IAM.Enforce(user, domain, "process:"+id, "write") {
return nil, fmt.Errorf("forbidden")
}
tid := restream.TaskID{
ID: id,
Domain: domain,
}
probe := r.Restream.Probe(tid)
p := &models.Probe{}
p.UnmarshalRestream(probe)

View File

@@ -7,6 +7,7 @@ import (
"time"
"github.com/datarhei/core/v16/http/graph/models"
"github.com/datarhei/core/v16/iam"
"github.com/datarhei/core/v16/log"
"github.com/datarhei/core/v16/monitor"
"github.com/datarhei/core/v16/restream"
@@ -20,25 +21,26 @@ type Resolver struct {
Restream restream.Restreamer
Monitor monitor.HistoryReader
LogBuffer log.BufferWriter
IAM iam.IAM
}
func (r *queryResolver) getProcess(id, user, group string) (*models.Process, error) {
process, err := r.Restream.GetProcess(id, user, group)
func (r *queryResolver) getProcess(id restream.TaskID) (*models.Process, error) {
process, err := r.Restream.GetProcess(id)
if err != nil {
return nil, err
}
state, err := r.Restream.GetProcessState(id, user, group)
state, err := r.Restream.GetProcessState(id)
if err != nil {
return nil, err
}
report, err := r.Restream.GetProcessLog(id, user, group)
report, err := r.Restream.GetProcessLog(id)
if err != nil {
return nil, err
}
m, err := r.Restream.GetProcessMetadata(id, user, group, "")
m, err := r.Restream.GetProcessMetadata(id, "")
if err != nil {
return nil, err
}

View File

@@ -16,19 +16,7 @@ import (
"github.com/labstack/echo/v4"
)
// The PlayoutHandler type provides handlers for accessing the playout API of a process
type PlayoutHandler struct {
restream restream.Restreamer
}
// NewPlayout returns a new Playout type. You have to provide a Restreamer instance.
func NewPlayout(restream restream.Restreamer) *PlayoutHandler {
return &PlayoutHandler{
restream: restream,
}
}
// Status return the current playout status
// PlayoutStatus return the current playout status
// @Summary Get the current playout status
// @Description Get the current playout status of an input of a process
// @Tags v16.7.2
@@ -41,13 +29,22 @@ func NewPlayout(restream restream.Restreamer) *PlayoutHandler {
// @Failure 500 {object} api.Error
// @Security ApiKeyAuth
// @Router /api/v3/process/{id}/playout/{inputid}/status [get]
func (h *PlayoutHandler) Status(c echo.Context) error {
func (h *RestreamHandler) PlayoutStatus(c echo.Context) error {
id := util.PathParam(c, "id")
inputid := util.PathParam(c, "inputid")
user := util.DefaultContext(c, "user", "")
group := util.DefaultQuery(c, "group", "")
domain := util.DefaultQuery(c, "domain", "")
addr, err := h.restream.GetPlayout(id, user, group, inputid)
if !h.iam.Enforce(user, domain, "process:"+id, "read") {
return api.Err(http.StatusForbidden, "Forbidden")
}
tid := restream.TaskID{
ID: id,
Domain: domain,
}
addr, err := h.restream.GetPlayout(tid, inputid)
if err != nil {
return api.Err(http.StatusNotFound, "Unknown process or input", "%s", err)
}
@@ -84,7 +81,7 @@ func (h *PlayoutHandler) Status(c echo.Context) error {
return c.Blob(response.StatusCode, response.Header.Get("content-type"), data)
}
// Keyframe returns the last keyframe
// PlayoutKeyframe returns the last keyframe
// @Summary Get the last keyframe
// @Description Get the last keyframe of an input of a process. The extension of the name determines the return type.
// @Tags v16.7.2
@@ -100,14 +97,23 @@ func (h *PlayoutHandler) Status(c echo.Context) error {
// @Failure 500 {object} api.Error
// @Security ApiKeyAuth
// @Router /api/v3/process/{id}/playout/{inputid}/keyframe/{name} [get]
func (h *PlayoutHandler) Keyframe(c echo.Context) error {
func (h *RestreamHandler) PlayoutKeyframe(c echo.Context) error {
id := util.PathParam(c, "id")
inputid := util.PathParam(c, "inputid")
name := util.PathWildcardParam(c)
user := util.DefaultContext(c, "user", "")
group := util.DefaultQuery(c, "group", "")
domain := util.DefaultQuery(c, "domain", "")
addr, err := h.restream.GetPlayout(id, user, group, inputid)
if !h.iam.Enforce(user, domain, "process:"+id, "read") {
return api.Err(http.StatusForbidden, "Forbidden")
}
tid := restream.TaskID{
ID: id,
Domain: domain,
}
addr, err := h.restream.GetPlayout(tid, inputid)
if err != nil {
return api.Err(http.StatusNotFound, "Unknown process or input", "%s", err)
}
@@ -136,7 +142,7 @@ func (h *PlayoutHandler) Keyframe(c echo.Context) error {
return c.Blob(response.StatusCode, response.Header.Get("content-type"), data)
}
// EncodeErrorframe encodes the errorframe
// PlayoutEncodeErrorframe encodes the errorframe
// @Summary Encode the errorframe
// @Description Immediately encode the errorframe (if available and looping)
// @Tags v16.7.2
@@ -150,13 +156,22 @@ func (h *PlayoutHandler) Keyframe(c echo.Context) error {
// @Failure 500 {object} api.Error
// @Security ApiKeyAuth
// @Router /api/v3/process/{id}/playout/{inputid}/errorframe/encode [get]
func (h *PlayoutHandler) EncodeErrorframe(c echo.Context) error {
func (h *RestreamHandler) PlayoutEncodeErrorframe(c echo.Context) error {
id := util.PathParam(c, "id")
inputid := util.PathParam(c, "inputid")
user := util.DefaultContext(c, "user", "")
group := util.DefaultQuery(c, "group", "")
domain := util.DefaultQuery(c, "domain", "")
addr, err := h.restream.GetPlayout(id, user, group, inputid)
if !h.iam.Enforce(user, domain, "process:"+id, "write") {
return api.Err(http.StatusForbidden, "Forbidden")
}
tid := restream.TaskID{
ID: id,
Domain: domain,
}
addr, err := h.restream.GetPlayout(tid, inputid)
if err != nil {
return api.Err(http.StatusNotFound, "Unknown process or input", "%s", err)
}
@@ -179,7 +194,7 @@ func (h *PlayoutHandler) EncodeErrorframe(c echo.Context) error {
return c.Blob(response.StatusCode, response.Header.Get("content-type"), data)
}
// SetErrorframe sets an errorframe
// PlayoutSetErrorframe sets an errorframe
// @Summary Upload an error frame
// @Description Upload an error frame which will be encoded immediately
// @Tags v16.7.2
@@ -196,13 +211,22 @@ func (h *PlayoutHandler) EncodeErrorframe(c echo.Context) error {
// @Failure 500 {object} api.Error
// @Security ApiKeyAuth
// @Router /api/v3/process/{id}/playout/{inputid}/errorframe/{name} [post]
func (h *PlayoutHandler) SetErrorframe(c echo.Context) error {
func (h *RestreamHandler) PlayoutSetErrorframe(c echo.Context) error {
id := util.PathParam(c, "id")
inputid := util.PathParam(c, "inputid")
user := util.DefaultContext(c, "user", "")
group := util.DefaultQuery(c, "group", "")
domain := util.DefaultQuery(c, "domain", "")
addr, err := h.restream.GetPlayout(id, user, group, inputid)
if !h.iam.Enforce(user, domain, "process:"+id, "write") {
return api.Err(http.StatusForbidden, "Forbidden")
}
tid := restream.TaskID{
ID: id,
Domain: domain,
}
addr, err := h.restream.GetPlayout(tid, inputid)
if err != nil {
return api.Err(http.StatusNotFound, "Unknown process or input", "%s", err)
}
@@ -230,7 +254,7 @@ func (h *PlayoutHandler) SetErrorframe(c echo.Context) error {
return c.Blob(response.StatusCode, response.Header.Get("content-type"), data)
}
// ReopenInput closes the current input stream
// PlayoutReopenInput closes the current input stream
// @Summary Close the current input stream
// @Description Close the current input stream such that it will be automatically re-opened
// @Tags v16.7.2
@@ -243,13 +267,22 @@ func (h *PlayoutHandler) SetErrorframe(c echo.Context) error {
// @Failure 500 {object} api.Error
// @Security ApiKeyAuth
// @Router /api/v3/process/{id}/playout/{inputid}/reopen [get]
func (h *PlayoutHandler) ReopenInput(c echo.Context) error {
func (h *RestreamHandler) PlayoutReopenInput(c echo.Context) error {
id := util.PathParam(c, "id")
inputid := util.PathParam(c, "inputid")
user := util.DefaultContext(c, "user", "")
group := util.DefaultQuery(c, "group", "")
domain := util.DefaultQuery(c, "domain", "")
addr, err := h.restream.GetPlayout(id, user, group, inputid)
if !h.iam.Enforce(user, domain, "process:"+id, "write") {
return api.Err(http.StatusForbidden, "Forbidden")
}
tid := restream.TaskID{
ID: id,
Domain: domain,
}
addr, err := h.restream.GetPlayout(tid, inputid)
if err != nil {
return api.Err(http.StatusNotFound, "Unknown process or input", "%s", err)
}
@@ -272,7 +305,7 @@ func (h *PlayoutHandler) ReopenInput(c echo.Context) error {
return c.Blob(response.StatusCode, response.Header.Get("content-type"), data)
}
// SetStream replaces the current stream
// PlayoutSetStream replaces the current stream
// @Summary Switch to a new stream
// @Description Replace the current stream with the one from the given URL. The switch will only happen if the stream parameters match.
// @Tags v16.7.2
@@ -288,13 +321,22 @@ func (h *PlayoutHandler) ReopenInput(c echo.Context) error {
// @Failure 500 {object} api.Error
// @Security ApiKeyAuth
// @Router /api/v3/process/{id}/playout/{inputid}/stream [put]
func (h *PlayoutHandler) SetStream(c echo.Context) error {
func (h *RestreamHandler) PlayoutSetStream(c echo.Context) error {
id := util.PathParam(c, "id")
inputid := util.PathParam(c, "inputid")
user := util.DefaultContext(c, "user", "")
group := util.DefaultQuery(c, "group", "")
domain := util.DefaultQuery(c, "domain", "")
addr, err := h.restream.GetPlayout(id, user, group, inputid)
if !h.iam.Enforce(user, domain, "process:"+id, "write") {
return api.Err(http.StatusForbidden, "Forbidden")
}
tid := restream.TaskID{
ID: id,
Domain: domain,
}
addr, err := h.restream.GetPlayout(tid, inputid)
if err != nil {
return api.Err(http.StatusNotFound, "Unknown process or input", "%s", err)
}
@@ -322,7 +364,7 @@ func (h *PlayoutHandler) SetStream(c echo.Context) error {
return c.Blob(response.StatusCode, response.Header.Get("content-type"), data)
}
func (h *PlayoutHandler) request(method, addr, path, contentType string, data []byte) (*http.Response, error) {
func (h *RestreamHandler) request(method, addr, path, contentType string, data []byte) (*http.Response, error) {
endpoint := "http://" + addr + path
body := bytes.NewBuffer(data)

View File

@@ -6,6 +6,7 @@ import (
"github.com/datarhei/core/v16/http/api"
"github.com/datarhei/core/v16/http/handler/util"
"github.com/datarhei/core/v16/iam"
"github.com/datarhei/core/v16/restream"
"github.com/labstack/echo/v4"
@@ -15,12 +16,14 @@ import (
// The RestreamHandler type provides functions to interact with a Restreamer instance
type RestreamHandler struct {
restream restream.Restreamer
iam iam.IAM
}
// NewRestream return a new Restream type. You have to provide a valid Restreamer instance.
func NewRestream(restream restream.Restreamer) *RestreamHandler {
func NewRestream(restream restream.Restreamer, iam iam.IAM) *RestreamHandler {
return &RestreamHandler{
restream: restream,
iam: iam,
}
}
@@ -49,6 +52,10 @@ func (h *RestreamHandler) Add(c echo.Context) error {
return api.Err(http.StatusBadRequest, "Invalid JSON", "%s", err)
}
if !h.iam.Enforce(user, process.Group, "process:"+process.ID, "write") {
return api.Err(http.StatusForbidden, "Forbidden")
}
if process.Type != "ffmpeg" {
return api.Err(http.StatusBadRequest, "Unsupported process type", "Supported process types are: ffmpeg")
}
@@ -64,7 +71,12 @@ func (h *RestreamHandler) Add(c echo.Context) error {
return api.Err(http.StatusBadRequest, "Invalid process config", "%s", err.Error())
}
p, _ := h.getProcess(config.ID, config.Owner, config.Group, "config")
tid := restream.TaskID{
ID: config.ID,
Domain: config.Domain,
}
p, _ := h.getProcess(tid, config.Owner, "config")
return c.JSON(http.StatusOK, p.Config)
}
@@ -92,15 +104,24 @@ func (h *RestreamHandler) GetAll(c echo.Context) error {
idpattern := util.DefaultQuery(c, "idpattern", "")
refpattern := util.DefaultQuery(c, "refpattern", "")
user := util.DefaultContext(c, "user", "")
group := util.DefaultQuery(c, "group", "")
domain := util.DefaultQuery(c, "domain", "")
ids := h.restream.GetProcessIDs(idpattern, refpattern, user, group)
preids := h.restream.GetProcessIDs(idpattern, refpattern, "", "")
ids := []restream.TaskID{}
for _, id := range preids {
if !h.iam.Enforce(user, domain, "process:"+id.ID, "read") {
continue
}
ids = append(ids, id)
}
processes := []api.Process{}
if len(wantids) == 0 || len(reference) != 0 {
for _, id := range ids {
if p, err := h.getProcess(id, user, group, filter); err == nil {
if p, err := h.getProcess(id, user, filter); err == nil {
if len(reference) != 0 && p.Reference != reference {
continue
}
@@ -110,8 +131,8 @@ func (h *RestreamHandler) GetAll(c echo.Context) error {
} else {
for _, id := range ids {
for _, wantid := range wantids {
if wantid == id {
if p, err := h.getProcess(id, user, group, filter); err == nil {
if wantid == id.ID {
if p, err := h.getProcess(id, user, filter); err == nil {
processes = append(processes, p)
}
}
@@ -138,9 +159,18 @@ func (h *RestreamHandler) Get(c echo.Context) error {
id := util.PathParam(c, "id")
filter := util.DefaultQuery(c, "filter", "")
user := util.DefaultContext(c, "user", "")
group := util.DefaultQuery(c, "group", "")
domain := util.DefaultQuery(c, "domain", "")
p, err := h.getProcess(id, user, group, filter)
if !h.iam.Enforce(user, domain, "process:"+id, "read") {
return api.Err(http.StatusForbidden, "Forbidden")
}
tid := restream.TaskID{
ID: id,
Domain: domain,
}
p, err := h.getProcess(tid, user, filter)
if err != nil {
return api.Err(http.StatusNotFound, "Unknown process ID", "%s", err)
}
@@ -162,13 +192,22 @@ func (h *RestreamHandler) Get(c echo.Context) error {
func (h *RestreamHandler) Delete(c echo.Context) error {
id := util.PathParam(c, "id")
user := util.DefaultContext(c, "user", "")
group := util.DefaultQuery(c, "group", "")
domain := util.DefaultQuery(c, "domain", "")
if err := h.restream.StopProcess(id, user, group); err != nil {
if !h.iam.Enforce(user, domain, "process:"+id, "write") {
return api.Err(http.StatusForbidden, "Forbidden")
}
tid := restream.TaskID{
ID: id,
Domain: domain,
}
if err := h.restream.StopProcess(tid); err != nil {
return api.Err(http.StatusNotFound, "Unknown process ID", "%s", err)
}
if err := h.restream.DeleteProcess(id, user, group); err != nil {
if err := h.restream.DeleteProcess(tid); err != nil {
return api.Err(http.StatusInternalServerError, "Process can't be deleted", "%s", err)
}
@@ -192,7 +231,7 @@ func (h *RestreamHandler) Delete(c echo.Context) error {
func (h *RestreamHandler) Update(c echo.Context) error {
id := util.PathParam(c, "id")
user := util.DefaultContext(c, "user", "")
group := util.DefaultQuery(c, "group", "")
domain := util.DefaultQuery(c, "domain", "")
process := api.ProcessConfig{
ID: id,
@@ -200,7 +239,16 @@ func (h *RestreamHandler) Update(c echo.Context) error {
Autostart: true,
}
current, err := h.restream.GetProcess(id, user, group)
if !h.iam.Enforce(user, domain, "process:"+id, "write") {
return api.Err(http.StatusForbidden, "Forbidden")
}
tid := restream.TaskID{
ID: id,
Domain: domain,
}
current, err := h.restream.GetProcess(tid)
if err != nil {
return api.Err(http.StatusNotFound, "Process not found", "%s", id)
}
@@ -215,7 +263,16 @@ func (h *RestreamHandler) Update(c echo.Context) error {
config := process.Marshal()
config.Owner = user
if err := h.restream.UpdateProcess(id, user, group, config); err != nil {
if !h.iam.Enforce(user, config.Domain, "process:"+config.ID, "write") {
return api.Err(http.StatusForbidden, "Forbidden")
}
tid = restream.TaskID{
ID: id,
Domain: domain,
}
if err := h.restream.UpdateProcess(tid, config); err != nil {
if err == restream.ErrUnknownProcess {
return api.Err(http.StatusNotFound, "Process not found", "%s", id)
}
@@ -223,7 +280,12 @@ func (h *RestreamHandler) Update(c echo.Context) error {
return api.Err(http.StatusBadRequest, "Process can't be updated", "%s", err)
}
p, _ := h.getProcess(config.ID, config.Owner, config.Group, "config")
tid = restream.TaskID{
ID: config.ID,
Domain: config.Domain,
}
p, _ := h.getProcess(tid, config.Owner, "config")
return c.JSON(http.StatusOK, p.Config)
}
@@ -245,7 +307,11 @@ func (h *RestreamHandler) Update(c echo.Context) error {
func (h *RestreamHandler) Command(c echo.Context) error {
id := util.PathParam(c, "id")
user := util.DefaultContext(c, "user", "")
group := util.DefaultQuery(c, "group", "")
domain := util.DefaultQuery(c, "domain", "")
if !h.iam.Enforce(user, domain, "process:"+id, "write") {
return api.Err(http.StatusForbidden, "Forbidden")
}
var command api.Command
@@ -253,15 +319,20 @@ func (h *RestreamHandler) Command(c echo.Context) error {
return api.Err(http.StatusBadRequest, "Invalid JSON", "%s", err)
}
tid := restream.TaskID{
ID: id,
Domain: domain,
}
var err error
if command.Command == "start" {
err = h.restream.StartProcess(id, user, group)
err = h.restream.StartProcess(tid)
} else if command.Command == "stop" {
err = h.restream.StopProcess(id, user, group)
err = h.restream.StopProcess(tid)
} else if command.Command == "restart" {
err = h.restream.RestartProcess(id, user, group)
err = h.restream.RestartProcess(tid)
} else if command.Command == "reload" {
err = h.restream.ReloadProcess(id, user, group)
err = h.restream.ReloadProcess(tid)
} else {
return api.Err(http.StatusBadRequest, "Unknown command provided", "Known commands are: start, stop, reload, restart")
}
@@ -288,9 +359,18 @@ func (h *RestreamHandler) Command(c echo.Context) error {
func (h *RestreamHandler) GetConfig(c echo.Context) error {
id := util.PathParam(c, "id")
user := util.DefaultContext(c, "user", "")
group := util.DefaultQuery(c, "group", "")
domain := util.DefaultQuery(c, "domain", "")
p, err := h.restream.GetProcess(id, user, group)
if !h.iam.Enforce(user, domain, "process:"+id, "read") {
return api.Err(http.StatusForbidden, "Forbidden")
}
tid := restream.TaskID{
ID: id,
Domain: domain,
}
p, err := h.restream.GetProcess(tid)
if err != nil {
return api.Err(http.StatusNotFound, "Unknown process ID", "%s", err)
}
@@ -316,9 +396,18 @@ func (h *RestreamHandler) GetConfig(c echo.Context) error {
func (h *RestreamHandler) GetState(c echo.Context) error {
id := util.PathParam(c, "id")
user := util.DefaultContext(c, "user", "")
group := util.DefaultQuery(c, "group", "")
domain := util.DefaultQuery(c, "domain", "")
s, err := h.restream.GetProcessState(id, user, group)
if !h.iam.Enforce(user, domain, "process:"+id, "read") {
return api.Err(http.StatusForbidden, "Forbidden")
}
tid := restream.TaskID{
ID: id,
Domain: domain,
}
s, err := h.restream.GetProcessState(tid)
if err != nil {
return api.Err(http.StatusNotFound, "Unknown process ID", "%s", err)
}
@@ -344,9 +433,18 @@ func (h *RestreamHandler) GetState(c echo.Context) error {
func (h *RestreamHandler) GetReport(c echo.Context) error {
id := util.PathParam(c, "id")
user := util.DefaultContext(c, "user", "")
group := util.DefaultQuery(c, "group", "")
domain := util.DefaultQuery(c, "domain", "")
l, err := h.restream.GetProcessLog(id, user, group)
if !h.iam.Enforce(user, domain, "process:"+id, "read") {
return api.Err(http.StatusForbidden, "Forbidden")
}
tid := restream.TaskID{
ID: id,
Domain: domain,
}
l, err := h.restream.GetProcessLog(tid)
if err != nil {
return api.Err(http.StatusNotFound, "Unknown process ID", "%s", err)
}
@@ -370,9 +468,18 @@ func (h *RestreamHandler) GetReport(c echo.Context) error {
func (h *RestreamHandler) Probe(c echo.Context) error {
id := util.PathParam(c, "id")
user := util.DefaultContext(c, "user", "")
group := util.DefaultQuery(c, "group", "")
domain := util.DefaultQuery(c, "domain", "")
probe := h.restream.Probe(id, user, group)
if !h.iam.Enforce(user, domain, "process:"+id, "write") {
return api.Err(http.StatusForbidden, "Forbidden")
}
tid := restream.TaskID{
ID: id,
Domain: domain,
}
probe := h.restream.Probe(tid)
apiprobe := api.Probe{}
apiprobe.Unmarshal(&probe)
@@ -434,9 +541,18 @@ func (h *RestreamHandler) GetProcessMetadata(c echo.Context) error {
id := util.PathParam(c, "id")
key := util.PathParam(c, "key")
user := util.DefaultContext(c, "user", "")
group := util.DefaultQuery(c, "group", "")
domain := util.DefaultQuery(c, "domain", "")
data, err := h.restream.GetProcessMetadata(id, user, group, key)
if !h.iam.Enforce(user, domain, "process:"+id, "read") {
return api.Err(http.StatusForbidden, "Forbidden")
}
tid := restream.TaskID{
ID: id,
Domain: domain,
}
data, err := h.restream.GetProcessMetadata(tid, key)
if err != nil {
return api.Err(http.StatusNotFound, "Unknown process ID", "%s", err)
}
@@ -462,7 +578,11 @@ func (h *RestreamHandler) SetProcessMetadata(c echo.Context) error {
id := util.PathParam(c, "id")
key := util.PathParam(c, "key")
user := util.DefaultContext(c, "user", "")
group := util.DefaultQuery(c, "group", "")
domain := util.DefaultQuery(c, "domain", "")
if !h.iam.Enforce(user, domain, "process:"+id, "write") {
return api.Err(http.StatusForbidden, "Forbidden")
}
if len(key) == 0 {
return api.Err(http.StatusBadRequest, "Invalid key", "The key must not be of length 0")
@@ -474,7 +594,12 @@ func (h *RestreamHandler) SetProcessMetadata(c echo.Context) error {
return api.Err(http.StatusBadRequest, "Invalid JSON", "%s", err)
}
if err := h.restream.SetProcessMetadata(id, key, user, group, data); err != nil {
tid := restream.TaskID{
ID: id,
Domain: domain,
}
if err := h.restream.SetProcessMetadata(tid, key, data); err != nil {
return api.Err(http.StatusNotFound, "Unknown process ID", "%s", err)
}
@@ -536,7 +661,7 @@ func (h *RestreamHandler) SetMetadata(c echo.Context) error {
return c.JSON(http.StatusOK, data)
}
func (h *RestreamHandler) getProcess(id, user, group, filterString string) (api.Process, error) {
func (h *RestreamHandler) getProcess(id restream.TaskID, user, filterString string) (api.Process, error) {
filter := strings.FieldsFunc(filterString, func(r rune) bool {
return r == rune(',')
})
@@ -560,7 +685,7 @@ func (h *RestreamHandler) getProcess(id, user, group, filterString string) (api.
}
}
process, err := h.restream.GetProcess(id, user, group)
process, err := h.restream.GetProcess(id)
if err != nil {
return api.Process{}, err
}
@@ -579,21 +704,21 @@ func (h *RestreamHandler) getProcess(id, user, group, filterString string) (api.
}
if wants["state"] {
if state, err := h.restream.GetProcessState(id, user, group); err == nil {
if state, err := h.restream.GetProcessState(id); err == nil {
info.State = &api.ProcessState{}
info.State.Unmarshal(state)
}
}
if wants["report"] {
if log, err := h.restream.GetProcessLog(id, user, group); err == nil {
if log, err := h.restream.GetProcessLog(id); err == nil {
info.Report = &api.ProcessReport{}
info.Report.Unmarshal(log)
}
}
if wants["metadata"] {
if data, err := h.restream.GetProcessMetadata(id, "", user, group); err == nil {
if data, err := h.restream.GetProcessMetadata(id, ""); err == nil {
info.Metadata = api.NewMetadata(data)
}
}

View File

@@ -3,11 +3,14 @@ package api
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"testing"
"github.com/datarhei/core/v16/http/api"
"github.com/datarhei/core/v16/http/mock"
"github.com/datarhei/core/v16/iam"
"github.com/datarhei/core/v16/io/fs"
"github.com/labstack/echo/v4"
"github.com/stretchr/testify/require"
@@ -25,7 +28,29 @@ func getDummyRestreamHandler() (*RestreamHandler, error) {
return nil, err
}
handler := NewRestream(rs)
memfs, err := fs.NewMemFilesystem(fs.MemConfig{})
if err != nil {
return nil, fmt.Errorf("failed to create memory filesystem: %w", err)
}
iam, err := iam.NewIAM(iam.Config{
FS: memfs,
Superuser: iam.User{
Name: "foobar",
},
JWTRealm: "",
JWTSecret: "",
Logger: nil,
})
if err != nil {
return nil, err
}
iam.AddPolicy("$anon", "$none", "api:/**", []string{"ANY"})
iam.AddPolicy("$anon", "$none", "fs:/**", []string{"ANY"})
iam.AddPolicy("$anon", "$none", "process:**", []string{"ANY"})
handler := NewRestream(rs, iam)
return handler, nil
}

View File

@@ -43,18 +43,23 @@ func NewWidget(config WidgetConfig) *WidgetHandler {
// @Router /api/v3/widget/process/{id} [get]
func (w *WidgetHandler) Get(c echo.Context) error {
id := util.PathParam(c, "id")
group := util.DefaultQuery(c, "group", "")
domain := util.DefaultQuery(c, "domain", "$none")
if w.restream == nil {
return api.Err(http.StatusNotFound, "Unknown process ID")
}
process, err := w.restream.GetProcess(id, "", group)
tid := restream.TaskID{
ID: id,
Domain: domain,
}
process, err := w.restream.GetProcess(tid)
if err != nil {
return api.Err(http.StatusNotFound, "Unknown process ID", "%s", err)
}
state, err := w.restream.GetProcessState(id, "", group)
state, err := w.restream.GetProcessState(tid)
if err != nil {
return api.Err(http.StatusNotFound, "Unknown process ID", "%s", err)
}

View File

@@ -20,7 +20,7 @@ import (
"github.com/datarhei/core/v16/internal/testhelper"
"github.com/datarhei/core/v16/io/fs"
"github.com/datarhei/core/v16/restream"
"github.com/datarhei/core/v16/restream/store"
jsonstore "github.com/datarhei/core/v16/restream/store/json"
"github.com/invopop/jsonschema"
"github.com/labstack/echo/v4"
@@ -39,7 +39,7 @@ func DummyRestreamer(pathPrefix string) (restream.Restreamer, error) {
return nil, fmt.Errorf("failed to create memory filesystem: %w", err)
}
store, err := store.NewJSON(store.JSONConfig{
store, err := jsonstore.New(jsonstore.Config{
Filesystem: memfs,
})
if err != nil {
@@ -66,9 +66,6 @@ func DummyRestreamer(pathPrefix string) (restream.Restreamer, error) {
return nil, err
}
iam.AddPolicy("$anon", "$none", "api:/**", []string{"ANY"})
iam.AddPolicy("$anon", "$none", "fs:/**", []string{"ANY"})
rs, err := restream.New(restream.Config{
Store: store,
FFmpeg: ffmpeg,

View File

@@ -119,7 +119,6 @@ type server struct {
v3handler struct {
log *api.LogHandler
restream *api.RestreamHandler
playout *api.PlayoutHandler
rtmp *api.RTMPHandler
srt *api.SRTHandler
config *api.ConfigHandler
@@ -246,10 +245,7 @@ func NewServer(config Config) (Server, error) {
if config.Restream != nil {
s.v3handler.restream = api.NewRestream(
config.Restream,
)
s.v3handler.playout = api.NewPlayout(
config.Restream,
config.IAM,
)
}
@@ -326,6 +322,7 @@ func NewServer(config Config) (Server, error) {
Restream: config.Restream,
Monitor: config.Metrics,
LogBuffer: config.LogBuffer,
IAM: config.IAM,
}, "/api/graph/query")
s.gzip.mimetypes = []string{
@@ -570,18 +567,16 @@ func (s *server) setRoutesV3(v3 *echo.Group) {
}
// v3 Playout
if s.v3handler.playout != nil {
v3.GET("/process/:id/playout/:inputid/status", s.v3handler.playout.Status)
v3.GET("/process/:id/playout/:inputid/reopen", s.v3handler.playout.ReopenInput)
v3.GET("/process/:id/playout/:inputid/keyframe/*", s.v3handler.playout.Keyframe)
v3.GET("/process/:id/playout/:inputid/errorframe/encode", s.v3handler.playout.EncodeErrorframe)
v3.GET("/process/:id/playout/:inputid/status", s.v3handler.restream.PlayoutStatus)
v3.GET("/process/:id/playout/:inputid/reopen", s.v3handler.restream.PlayoutReopenInput)
v3.GET("/process/:id/playout/:inputid/keyframe/*", s.v3handler.restream.PlayoutKeyframe)
v3.GET("/process/:id/playout/:inputid/errorframe/encode", s.v3handler.restream.PlayoutEncodeErrorframe)
if !s.readOnly {
v3.PUT("/process/:id/playout/:inputid/errorframe/*", s.v3handler.playout.SetErrorframe)
v3.POST("/process/:id/playout/:inputid/errorframe/*", s.v3handler.playout.SetErrorframe)
if !s.readOnly {
v3.PUT("/process/:id/playout/:inputid/errorframe/*", s.v3handler.restream.PlayoutSetErrorframe)
v3.POST("/process/:id/playout/:inputid/errorframe/*", s.v3handler.restream.PlayoutSetErrorframe)
v3.PUT("/process/:id/playout/:inputid/stream", s.v3handler.playout.SetStream)
}
v3.PUT("/process/:id/playout/:inputid/stream", s.v3handler.restream.PlayoutSetStream)
}
}

View File

@@ -57,46 +57,46 @@ func (c *restreamCollector) Collect() metric.Metrics {
"starting": 0,
}
ids := c.r.GetProcessIDs("", "", "$superuser", "$none")
ids := c.r.GetProcessIDs("", "", "", "")
for _, id := range ids {
state, _ := c.r.GetProcessState(id, "$superuser", "$none")
state, _ := c.r.GetProcessState(id)
if state == nil {
continue
}
proc, _ := c.r.GetProcess(id, "$superuser", "$none")
proc, _ := c.r.GetProcess(id)
if proc == nil {
continue
}
states[state.State]++
metrics.Add(metric.NewValue(c.restreamProcessDescr, float64(state.Progress.Frame), id, state.State, state.Order, "frame"))
metrics.Add(metric.NewValue(c.restreamProcessDescr, float64(state.Progress.FPS), id, state.State, state.Order, "fps"))
metrics.Add(metric.NewValue(c.restreamProcessDescr, float64(state.Progress.Speed), id, state.State, state.Order, "speed"))
metrics.Add(metric.NewValue(c.restreamProcessDescr, state.Progress.Quantizer, id, state.State, state.Order, "q"))
metrics.Add(metric.NewValue(c.restreamProcessDescr, float64(state.Progress.Size), id, state.State, state.Order, "size"))
metrics.Add(metric.NewValue(c.restreamProcessDescr, state.Progress.Time, id, state.State, state.Order, "time"))
metrics.Add(metric.NewValue(c.restreamProcessDescr, float64(state.Progress.Drop), id, state.State, state.Order, "drop"))
metrics.Add(metric.NewValue(c.restreamProcessDescr, float64(state.Progress.Dup), id, state.State, state.Order, "dup"))
metrics.Add(metric.NewValue(c.restreamProcessDescr, float64(state.Progress.Packet), id, state.State, state.Order, "packet"))
metrics.Add(metric.NewValue(c.restreamProcessDescr, state.Progress.Bitrate, id, state.State, state.Order, "bitrate"))
metrics.Add(metric.NewValue(c.restreamProcessDescr, state.CPU, id, state.State, state.Order, "cpu"))
metrics.Add(metric.NewValue(c.restreamProcessDescr, float64(state.Memory), id, state.State, state.Order, "memory"))
metrics.Add(metric.NewValue(c.restreamProcessDescr, state.Duration, id, state.State, state.Order, "uptime"))
metrics.Add(metric.NewValue(c.restreamProcessDescr, float64(state.Progress.Frame), id.String(), state.State, state.Order, "frame"))
metrics.Add(metric.NewValue(c.restreamProcessDescr, float64(state.Progress.FPS), id.String(), state.State, state.Order, "fps"))
metrics.Add(metric.NewValue(c.restreamProcessDescr, float64(state.Progress.Speed), id.String(), state.State, state.Order, "speed"))
metrics.Add(metric.NewValue(c.restreamProcessDescr, state.Progress.Quantizer, id.String(), state.State, state.Order, "q"))
metrics.Add(metric.NewValue(c.restreamProcessDescr, float64(state.Progress.Size), id.String(), state.State, state.Order, "size"))
metrics.Add(metric.NewValue(c.restreamProcessDescr, state.Progress.Time, id.String(), state.State, state.Order, "time"))
metrics.Add(metric.NewValue(c.restreamProcessDescr, float64(state.Progress.Drop), id.String(), state.State, state.Order, "drop"))
metrics.Add(metric.NewValue(c.restreamProcessDescr, float64(state.Progress.Dup), id.String(), state.State, state.Order, "dup"))
metrics.Add(metric.NewValue(c.restreamProcessDescr, float64(state.Progress.Packet), id.String(), state.State, state.Order, "packet"))
metrics.Add(metric.NewValue(c.restreamProcessDescr, state.Progress.Bitrate, id.String(), state.State, state.Order, "bitrate"))
metrics.Add(metric.NewValue(c.restreamProcessDescr, state.CPU, id.String(), state.State, state.Order, "cpu"))
metrics.Add(metric.NewValue(c.restreamProcessDescr, float64(state.Memory), id.String(), state.State, state.Order, "memory"))
metrics.Add(metric.NewValue(c.restreamProcessDescr, state.Duration, id.String(), state.State, state.Order, "uptime"))
if proc.Config != nil {
metrics.Add(metric.NewValue(c.restreamProcessDescr, proc.Config.LimitCPU, id, state.State, state.Order, "cpu_limit"))
metrics.Add(metric.NewValue(c.restreamProcessDescr, float64(proc.Config.LimitMemory), id, state.State, state.Order, "memory_limit"))
metrics.Add(metric.NewValue(c.restreamProcessDescr, proc.Config.LimitCPU, id.String(), state.State, state.Order, "cpu_limit"))
metrics.Add(metric.NewValue(c.restreamProcessDescr, float64(proc.Config.LimitMemory), id.String(), state.State, state.Order, "memory_limit"))
}
metrics.Add(metric.NewValue(c.restreamProcessStatesDescr, float64(state.States.Failed), id, "failed"))
metrics.Add(metric.NewValue(c.restreamProcessStatesDescr, float64(state.States.Finished), id, "finished"))
metrics.Add(metric.NewValue(c.restreamProcessStatesDescr, float64(state.States.Finishing), id, "finishing"))
metrics.Add(metric.NewValue(c.restreamProcessStatesDescr, float64(state.States.Killed), id, "killed"))
metrics.Add(metric.NewValue(c.restreamProcessStatesDescr, float64(state.States.Running), id, "running"))
metrics.Add(metric.NewValue(c.restreamProcessStatesDescr, float64(state.States.Starting), id, "starting"))
metrics.Add(metric.NewValue(c.restreamProcessStatesDescr, float64(state.States.Failed), id.String(), "failed"))
metrics.Add(metric.NewValue(c.restreamProcessStatesDescr, float64(state.States.Finished), id.String(), "finished"))
metrics.Add(metric.NewValue(c.restreamProcessStatesDescr, float64(state.States.Finishing), id.String(), "finishing"))
metrics.Add(metric.NewValue(c.restreamProcessStatesDescr, float64(state.States.Killed), id.String(), "killed"))
metrics.Add(metric.NewValue(c.restreamProcessStatesDescr, float64(state.States.Running), id.String(), "running"))
metrics.Add(metric.NewValue(c.restreamProcessStatesDescr, float64(state.States.Starting), id.String(), "starting"))
for i := range state.Progress.Input {
io := &state.Progress.Input[i]
@@ -104,32 +104,32 @@ func (c *restreamCollector) Collect() metric.Metrics {
index := strconv.FormatUint(io.Index, 10)
stream := strconv.FormatUint(io.Stream, 10)
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(io.Frame), id, "input", io.ID, io.Address, index, stream, io.Type, "frame"))
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(io.FPS), id, "input", io.ID, io.Address, index, stream, io.Type, "fps"))
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(io.Packet), id, "input", io.ID, io.Address, index, stream, io.Type, "packet"))
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(io.PPS), id, "input", io.ID, io.Address, index, stream, io.Type, "pps"))
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(io.Size), id, "input", io.ID, io.Address, index, stream, io.Type, "size"))
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(io.Bitrate), id, "input", io.ID, io.Address, index, stream, io.Type, "bitrate"))
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(io.Frame), id.String(), "input", io.ID, io.Address, index, stream, io.Type, "frame"))
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(io.FPS), id.String(), "input", io.ID, io.Address, index, stream, io.Type, "fps"))
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(io.Packet), id.String(), "input", io.ID, io.Address, index, stream, io.Type, "packet"))
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(io.PPS), id.String(), "input", io.ID, io.Address, index, stream, io.Type, "pps"))
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(io.Size), id.String(), "input", io.ID, io.Address, index, stream, io.Type, "size"))
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(io.Bitrate), id.String(), "input", io.ID, io.Address, index, stream, io.Type, "bitrate"))
if io.AVstream != nil {
a := io.AVstream
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(a.Queue), id, "input", io.ID, io.Address, index, stream, io.Type, "avstream_queue"))
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(a.Dup), id, "input", io.ID, io.Address, index, stream, io.Type, "avstream_dup"))
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(a.Drop), id, "input", io.ID, io.Address, index, stream, io.Type, "avstream_drop"))
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(a.Enc), id, "input", io.ID, io.Address, index, stream, io.Type, "avstream_enc"))
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(a.Queue), id.String(), "input", io.ID, io.Address, index, stream, io.Type, "avstream_queue"))
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(a.Dup), id.String(), "input", io.ID, io.Address, index, stream, io.Type, "avstream_dup"))
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(a.Drop), id.String(), "input", io.ID, io.Address, index, stream, io.Type, "avstream_drop"))
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(a.Enc), id.String(), "input", io.ID, io.Address, index, stream, io.Type, "avstream_enc"))
value = 0
if a.Looping {
value = 1
}
metrics.Add(metric.NewValue(c.restreamProcessIODescr, value, id, "input", io.ID, io.Address, index, stream, io.Type, "avstream_looping"))
metrics.Add(metric.NewValue(c.restreamProcessIODescr, value, id.String(), "input", io.ID, io.Address, index, stream, io.Type, "avstream_looping"))
value = 0
if a.Duplicating {
value = 1
}
metrics.Add(metric.NewValue(c.restreamProcessIODescr, value, id, "input", io.ID, io.Address, index, stream, io.Type, "avstream_duplicating"))
metrics.Add(metric.NewValue(c.restreamProcessIODescr, value, id.String(), "input", io.ID, io.Address, index, stream, io.Type, "avstream_duplicating"))
}
}
@@ -139,13 +139,13 @@ func (c *restreamCollector) Collect() metric.Metrics {
index := strconv.FormatUint(io.Index, 10)
stream := strconv.FormatUint(io.Stream, 10)
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(io.Frame), id, "output", io.ID, io.Address, index, stream, io.Type, "frame"))
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(io.FPS), id, "output", io.ID, io.Address, index, stream, io.Type, "fps"))
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(io.Packet), id, "output", io.ID, io.Address, index, stream, io.Type, "packet"))
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(io.PPS), id, "output", io.ID, io.Address, index, stream, io.Type, "pps"))
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(io.Size), id, "output", io.ID, io.Address, index, stream, io.Type, "size"))
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(io.Bitrate), id, "output", io.ID, io.Address, index, stream, io.Type, "bitrate"))
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(io.Quantizer), id, "output", io.ID, io.Address, index, stream, io.Type, "q"))
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(io.Frame), id.String(), "output", io.ID, io.Address, index, stream, io.Type, "frame"))
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(io.FPS), id.String(), "output", io.ID, io.Address, index, stream, io.Type, "fps"))
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(io.Packet), id.String(), "output", io.ID, io.Address, index, stream, io.Type, "packet"))
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(io.PPS), id.String(), "output", io.ID, io.Address, index, stream, io.Type, "pps"))
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(io.Size), id.String(), "output", io.ID, io.Address, index, stream, io.Type, "size"))
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(io.Bitrate), id.String(), "output", io.ID, io.Address, index, stream, io.Type, "bitrate"))
metrics.Add(metric.NewValue(c.restreamProcessIODescr, float64(io.Quantizer), id.String(), "output", io.ID, io.Address, index, stream, io.Type, "q"))
}
}

View File

@@ -5,17 +5,17 @@ import (
)
type ConfigIOCleanup struct {
Pattern string `json:"pattern"`
MaxFiles uint `json:"max_files"`
MaxFileAge uint `json:"max_file_age_seconds"`
PurgeOnDelete bool `json:"purge_on_delete"`
Pattern string
MaxFiles uint
MaxFileAge uint
PurgeOnDelete bool
}
type ConfigIO struct {
ID string `json:"id"`
Address string `json:"address"`
Options []string `json:"options"`
Cleanup []ConfigIOCleanup `json:"cleanup"`
ID string
Address string
Options []string
Cleanup []ConfigIOCleanup
}
func (io ConfigIO) Clone() ConfigIO {
@@ -34,27 +34,29 @@ func (io ConfigIO) Clone() ConfigIO {
}
type Config struct {
ID string `json:"id"`
Reference string `json:"reference"`
Owner string `json:"owner"`
Group string `json:"group"`
FFVersion string `json:"ffversion"`
Input []ConfigIO `json:"input"`
Output []ConfigIO `json:"output"`
Options []string `json:"options"`
Reconnect bool `json:"reconnect"`
ReconnectDelay uint64 `json:"reconnect_delay_seconds"` // seconds
Autostart bool `json:"autostart"`
StaleTimeout uint64 `json:"stale_timeout_seconds"` // seconds
LimitCPU float64 `json:"limit_cpu_usage"` // percent
LimitMemory uint64 `json:"limit_memory_bytes"` // bytes
LimitWaitFor uint64 `json:"limit_waitfor_seconds"` // seconds
ID string
Reference string
Owner string
Domain string
FFVersion string
Input []ConfigIO
Output []ConfigIO
Options []string
Reconnect bool
ReconnectDelay uint64 // seconds
Autostart bool
StaleTimeout uint64 // seconds
LimitCPU float64 // percent
LimitMemory uint64 // bytes
LimitWaitFor uint64 // seconds
}
func (config *Config) Clone() *Config {
clone := &Config{
ID: config.ID,
Reference: config.Reference,
Owner: config.Owner,
Domain: config.Domain,
FFVersion: config.FFVersion,
Reconnect: config.Reconnect,
ReconnectDelay: config.ReconnectDelay,
@@ -104,19 +106,21 @@ func (config *Config) CreateCommand() []string {
}
type Process struct {
ID string `json:"id"`
Owner string `json:"owner"`
Group string `json:"group"`
Reference string `json:"reference"`
Config *Config `json:"config"`
CreatedAt int64 `json:"created_at"`
UpdatedAt int64 `json:"updated_at"`
Order string `json:"order"`
ID string
Owner string
Domain string
Reference string
Config *Config
CreatedAt int64
UpdatedAt int64
Order string
}
func (process *Process) Clone() *Process {
clone := &Process{
ID: process.ID,
Owner: process.Owner,
Domain: process.Domain,
Reference: process.Reference,
Config: process.Config.Clone(),
CreatedAt: process.CreatedAt,

View File

@@ -26,6 +26,7 @@ import (
"github.com/datarhei/core/v16/restream/replace"
"github.com/datarhei/core/v16/restream/rewrite"
"github.com/datarhei/core/v16/restream/store"
jsonstore "github.com/datarhei/core/v16/restream/store/json"
"github.com/Masterminds/semver/v3"
)
@@ -43,22 +44,22 @@ type Restreamer interface {
SetMetadata(key string, data interface{}) error // Set general metadata
GetMetadata(key string) (interface{}, error) // Get previously set general metadata
AddProcess(config *app.Config) error // Add a new process
GetProcessIDs(idpattern, refpattern, user, group string) []string // Get a list of process IDs based on patterns for ID and reference
DeleteProcess(id, user, group string) error // Delete a process
UpdateProcess(id, user, group string, config *app.Config) error // Update a process
StartProcess(id, user, group string) error // Start a process
StopProcess(id, user, group string) error // Stop a process
RestartProcess(id, user, group string) error // Restart a process
ReloadProcess(id, user, group string) error // Reload a process
GetProcess(id, user, group string) (*app.Process, error) // Get a process
GetProcessState(id, user, group string) (*app.State, error) // Get the state of a process
GetProcessLog(id, user, group string) (*app.Log, error) // Get the logs of a process
GetPlayout(id, user, group, inputid string) (string, error) // Get the URL of the playout API for a process
Probe(id, user, group string) app.Probe // Probe a process
ProbeWithTimeout(id, user, group string, timeout time.Duration) app.Probe // Probe a process with specific timeout
SetProcessMetadata(id, user, group, key string, data interface{}) error // Set metatdata to a process
GetProcessMetadata(id, user, group, key string) (interface{}, error) // Get previously set metadata from a process
AddProcess(config *app.Config) error // Add a new process
GetProcessIDs(idpattern, refpattern, ownerpattern, domainpattern string) []TaskID // Get a list of process IDs based on patterns for ID and reference
DeleteProcess(id TaskID) error // Delete a process
UpdateProcess(id TaskID, config *app.Config) error // Update a process
StartProcess(id TaskID) error // Start a process
StopProcess(id TaskID) error // Stop a process
RestartProcess(id TaskID) error // Restart a process
ReloadProcess(id TaskID) error // Reload a process
GetProcess(id TaskID) (*app.Process, error) // Get a process
GetProcessState(id TaskID) (*app.State, error) // Get the state of a process
GetProcessLog(id TaskID) (*app.Log, error) // Get the logs of a process
GetPlayout(id TaskID, inputid string) (string, error) // Get the URL of the playout API for a process
Probe(id TaskID) app.Probe // Probe a process
ProbeWithTimeout(id TaskID, timeout time.Duration) app.Probe // Probe a process with specific timeout
SetProcessMetadata(id TaskID, key string, data interface{}) error // Set metatdata to a process
GetProcessMetadata(id TaskID, key string) (interface{}, error) // Get previously set metadata from a process
}
// Config is the required configuration for a new restreamer instance.
@@ -79,7 +80,7 @@ type task struct {
valid bool
id string // ID of the task/process
owner string
group string
domain string
reference string
process *app.Process
config *app.Config
@@ -92,12 +93,32 @@ type task struct {
metadata map[string]interface{}
}
func newTaskid(id, group string) string {
return id + "~" + group
func (t *task) ID() TaskID {
return TaskID{
ID: t.id,
Domain: t.domain,
}
}
func (t *task) String() string {
return newTaskid(t.id, t.group)
return t.ID().String()
}
type TaskID struct {
ID string
Domain string
}
func (t TaskID) String() string {
return t.ID + "@" + t.Domain
}
func (t TaskID) Equals(b TaskID) bool {
if t.ID == b.ID && t.Domain == b.Domain {
return true
}
return false
}
type restream struct {
@@ -115,9 +136,9 @@ type restream struct {
}
replace replace.Replacer
rewrite rewrite.Rewriter
tasks map[string]*task
tasks map[TaskID]*task // domain:processid
metadata map[string]interface{} // global metadata
logger log.Logger
metadata map[string]interface{}
lock sync.RWMutex
@@ -150,7 +171,7 @@ func New(config Config) (Restreamer, error) {
if r.store == nil {
dummyfs, _ := fs.NewMemFilesystem(fs.MemConfig{})
s, err := store.NewJSON(store.JSONConfig{
s, err := jsonstore.New(jsonstore.Config{
Filesystem: dummyfs,
})
if err != nil {
@@ -229,8 +250,7 @@ func (r *restream) Stop() {
r.lock.Lock()
defer r.lock.Unlock()
// Stop the currently running processes without
// altering their order such that on a subsequent
// Stop the currently running processes without altering their order such that on a subsequent
// Start() they will get restarted.
for id, t := range r.tasks {
if t.ffmpeg != nil {
@@ -297,7 +317,7 @@ func (r *restream) load() error {
return err
}
tasks := make(map[string]*task)
tasks := make(map[TaskID]*task)
skills := r.ffmpeg.Skills()
ffversion := skills.FFmpeg.Version
@@ -306,44 +326,39 @@ func (r *restream) load() error {
ffversion = fmt.Sprintf("%d.%d.0", v.Major(), v.Minor())
}
for _, process := range data.Process {
if len(process.Config.FFVersion) == 0 {
process.Config.FFVersion = "^" + ffversion
for _, domain := range data.Process {
for _, p := range domain {
if len(p.Process.Config.FFVersion) == 0 {
p.Process.Config.FFVersion = "^" + ffversion
}
t := &task{
id: p.Process.ID,
owner: p.Process.Owner,
domain: p.Process.Domain,
reference: p.Process.Reference,
process: p.Process,
config: p.Process.Config.Clone(),
logger: r.logger.WithFields(log.Fields{
"id": p.Process.ID,
"owner": p.Process.Owner,
"domain": p.Process.Domain,
}),
}
t.metadata = p.Metadata
// Replace all placeholders in the config
resolvePlaceholders(t.config, r.replace)
tasks[t.ID()] = t
}
t := &task{
id: process.ID,
owner: process.Owner,
group: process.Group,
reference: process.Reference,
process: process,
config: process.Config.Clone(),
logger: r.logger.WithFields(log.Fields{
"id": process.ID,
"owner": process.Owner,
"group": process.Group,
}),
}
// Replace all placeholders in the config
resolvePlaceholders(t.config, r.replace)
tasks[t.String()] = t
}
for tid, userdata := range data.Metadata.Process {
t, ok := tasks[tid]
if !ok {
continue
}
t.metadata = userdata
}
// Now that all tasks are defined and all placeholders are
// replaced, we can resolve references and validate the
// inputs and outputs.
for tid, t := range tasks {
for _, t := range tasks {
// Just warn if the ffmpeg version constraint doesn't match the available ffmpeg version
if c, err := semver.NewConstraint(t.config.FFVersion); err == nil {
if v, err := semver.NewVersion(skills.FFmpeg.Version); err == nil {
@@ -360,11 +375,6 @@ func (r *restream) load() error {
t.logger.Warn().WithError(err).Log("")
}
if !r.enforce(t.owner, t.group, t.id, "CREATE") {
t.logger.Warn().WithError(fmt.Errorf("forbidden")).Log("Ignoring")
continue
}
err := r.resolveAddresses(tasks, t.config)
if err != nil {
t.logger.Warn().WithError(err).Log("Ignoring")
@@ -384,7 +394,7 @@ func (r *restream) load() error {
}
t.command = t.config.CreateCommand()
t.parser = r.ffmpeg.NewProcessParser(t.logger, tid, t.reference)
t.parser = r.ffmpeg.NewProcessParser(t.logger, t.String(), t.reference)
ffmpeg, err := r.ffmpeg.New(ffmpeg.ProcessConfig{
Reconnect: t.config.Reconnect,
@@ -406,37 +416,33 @@ func (r *restream) load() error {
}
r.tasks = tasks
r.metadata = data.Metadata.System
r.metadata = data.Metadata
return nil
}
func (r *restream) save() {
data := store.NewStoreData()
data := store.NewData()
for tid, t := range r.tasks {
data.Process[tid] = t.process
data.Metadata.System = r.metadata
data.Metadata.Process[tid] = t.metadata
domain := data.Process[tid.Domain]
if domain == nil {
domain = map[string]store.Process{}
}
domain[tid.ID] = store.Process{
Process: t.process.Clone(),
Metadata: t.metadata,
}
data.Process[tid.Domain] = domain
}
data.Metadata = r.metadata
r.store.Store(data)
}
func (r *restream) enforce(name, group, processid, action string) bool {
if len(name) == 0 {
// This is for backwards compatibility. Existing processes don't have an owner.
// All processes that will be added later will have an owner ($anon, ...).
name = r.iam.GetDefaultVerifier().Name()
}
if len(group) == 0 {
group = "$none"
}
return r.iam.Enforce(name, group, "process:"+processid, action)
}
func (r *restream) ID() string {
return r.id
}
@@ -455,10 +461,6 @@ var ErrProcessExists = errors.New("process already exists")
var ErrForbidden = errors.New("forbidden")
func (r *restream) AddProcess(config *app.Config) error {
if !r.enforce(config.Owner, config.Group, config.ID, "CREATE") {
return ErrForbidden
}
r.lock.RLock()
t, err := r.createTask(config)
r.lock.RUnlock()
@@ -470,7 +472,7 @@ func (r *restream) AddProcess(config *app.Config) error {
r.lock.Lock()
defer r.lock.Unlock()
tid := t.String()
tid := t.ID()
_, ok := r.tasks[tid]
if ok {
@@ -510,7 +512,7 @@ func (r *restream) createTask(config *app.Config) (*task, error) {
process := &app.Process{
ID: config.ID,
Group: config.Group,
Domain: config.Domain,
Reference: config.Reference,
Config: config.Clone(),
Order: "stop",
@@ -525,13 +527,13 @@ func (r *restream) createTask(config *app.Config) (*task, error) {
t := &task{
id: config.ID,
group: config.Group,
domain: config.Domain,
reference: process.Reference,
process: process,
config: process.Config.Clone(),
logger: r.logger.WithFields(log.Fields{
"id": process.ID,
"group": process.Group,
"group": process.Domain,
}),
}
@@ -576,7 +578,7 @@ func (r *restream) createTask(config *app.Config) (*task, error) {
return t, nil
}
func (r *restream) setCleanup(id string, config *app.Config) {
func (r *restream) setCleanup(id TaskID, config *app.Config) {
rePrefix := regexp.MustCompile(`^([a-z]+):`)
for _, output := range config.Output {
@@ -607,7 +609,7 @@ func (r *restream) setCleanup(id string, config *app.Config) {
PurgeOnDelete: c.PurgeOnDelete,
}
fs.SetCleanup(id, []rfs.Pattern{
fs.SetCleanup(id.String(), []rfs.Pattern{
pattern,
})
@@ -617,9 +619,9 @@ func (r *restream) setCleanup(id string, config *app.Config) {
}
}
func (r *restream) unsetCleanup(id string) {
func (r *restream) unsetCleanup(id TaskID) {
for _, fs := range r.fs.list {
fs.UnsetCleanup(id)
fs.UnsetCleanup(id.String())
}
}
@@ -873,7 +875,7 @@ func (r *restream) validateOutputAddress(address, basedir string) (string, bool,
return "file:" + address, true, nil
}
func (r *restream) resolveAddresses(tasks map[string]*task, config *app.Config) error {
func (r *restream) resolveAddresses(tasks map[TaskID]*task, config *app.Config) error {
for i, input := range config.Input {
// Resolve any references
address, err := r.resolveAddress(tasks, config.ID, input.Address)
@@ -889,7 +891,7 @@ func (r *restream) resolveAddresses(tasks map[string]*task, config *app.Config)
return nil
}
func (r *restream) resolveAddress(tasks map[string]*task, id, address string) (string, error) {
func (r *restream) resolveAddress(tasks map[TaskID]*task, id, address string) (string, error) {
matches, err := parseAddressReference(address)
if err != nil {
return address, err
@@ -907,7 +909,7 @@ func (r *restream) resolveAddress(tasks map[string]*task, id, address string) (s
var t *task = nil
for _, tsk := range tasks {
if tsk.id == matches["id"] && tsk.group == matches["group"] {
if tsk.id == matches["id"] && tsk.domain == matches["group"] {
t = tsk
break
}
@@ -1024,28 +1026,24 @@ func parseAddressReference(address string) (map[string]string, error) {
return results, nil
}
func (r *restream) UpdateProcess(id, user, group string, config *app.Config) error {
if !r.enforce(user, group, id, "UPDATE") {
return ErrForbidden
}
func (r *restream) UpdateProcess(id TaskID, config *app.Config) error {
r.lock.Lock()
defer r.lock.Unlock()
task, ok := r.tasks[id]
if !ok {
return ErrUnknownProcess
}
t, err := r.createTask(config)
if err != nil {
return err
}
tid := newTaskid(id, group)
tid := t.ID()
task, ok := r.tasks[tid]
if !ok {
return ErrUnknownProcess
}
if tid != t.String() {
_, ok := r.tasks[t.String()]
if !tid.Equals(id) {
_, ok := r.tasks[tid]
if ok {
return ErrProcessExists
}
@@ -1053,11 +1051,11 @@ func (r *restream) UpdateProcess(id, user, group string, config *app.Config) err
t.process.Order = task.process.Order
if err := r.stopProcess(tid); err != nil {
if err := r.stopProcess(id); err != nil {
return fmt.Errorf("stop process: %w", err)
}
if err := r.deleteProcess(tid); err != nil {
if err := r.deleteProcess(id); err != nil {
return fmt.Errorf("delete process: %w", err)
}
@@ -1066,8 +1064,6 @@ func (r *restream) UpdateProcess(id, user, group string, config *app.Config) err
t.process.UpdatedAt = time.Now().Unix()
task.parser.TransferReportHistory(t.parser)
tid = t.String()
r.tasks[tid] = t
// set filesystem cleanup rules
@@ -1082,105 +1078,83 @@ func (r *restream) UpdateProcess(id, user, group string, config *app.Config) err
return nil
}
func (r *restream) GetProcessIDs(idpattern, refpattern, user, group string) []string {
func (r *restream) GetProcessIDs(idpattern, refpattern, ownerpattern, domainpattern string) []TaskID {
r.lock.RLock()
defer r.lock.RUnlock()
if len(idpattern) == 0 && len(refpattern) == 0 {
ids := []string{}
for _, t := range r.tasks {
if t.group != group {
continue
}
if !r.enforce(user, group, t.id, "GET") {
continue
}
ids = append(ids, t.id)
}
return ids
}
idmap := map[string]int{}
count := 0
if len(idpattern) != 0 {
for _, t := range r.tasks {
if t.group != group {
continue
}
if !r.enforce(user, group, t.id, "GET") {
continue
}
ids := []TaskID{}
for _, t := range r.tasks {
count := 0
matches := 0
if len(idpattern) != 0 {
count++
match, err := glob.Match(idpattern, t.id)
if err != nil {
return nil
}
if !match {
continue
if match {
matches++
}
idmap[t.id]++
}
count++
}
if len(refpattern) != 0 {
for _, t := range r.tasks {
if t.group != group {
continue
}
if !r.enforce(user, group, t.id, "GET") {
continue
}
if len(refpattern) != 0 {
count++
match, err := glob.Match(refpattern, t.reference)
if err != nil {
return nil
}
if !match {
continue
if match {
matches++
}
idmap[t.id]++
}
count++
}
if len(ownerpattern) != 0 {
count++
match, err := glob.Match(ownerpattern, t.owner)
if err != nil {
return nil
}
ids := []string{}
if match {
matches++
}
}
for id, n := range idmap {
if n != count {
if len(domainpattern) != 0 {
count++
match, err := glob.Match(domainpattern, t.domain)
if err != nil {
return nil
}
if match {
matches++
}
}
if count != matches {
continue
}
ids = append(ids, id)
tid := TaskID{
ID: t.id,
Domain: t.domain,
}
ids = append(ids, tid)
}
return ids
}
func (r *restream) GetProcess(id, user, group string) (*app.Process, error) {
if !r.enforce(user, group, id, "GET") {
return nil, ErrForbidden
}
tid := newTaskid(id, group)
func (r *restream) GetProcess(id TaskID) (*app.Process, error) {
r.lock.RLock()
defer r.lock.RUnlock()
task, ok := r.tasks[tid]
task, ok := r.tasks[id]
if !ok {
return &app.Process{}, ErrUnknownProcess
}
@@ -1190,17 +1164,11 @@ func (r *restream) GetProcess(id, user, group string) (*app.Process, error) {
return process, nil
}
func (r *restream) DeleteProcess(id, user, group string) error {
if !r.enforce(user, group, id, "DELETE") {
return ErrForbidden
}
tid := newTaskid(id, group)
func (r *restream) DeleteProcess(id TaskID) error {
r.lock.Lock()
defer r.lock.Unlock()
err := r.deleteProcess(tid)
err := r.deleteProcess(id)
if err != nil {
return err
}
@@ -1210,14 +1178,14 @@ func (r *restream) DeleteProcess(id, user, group string) error {
return nil
}
func (r *restream) deleteProcess(tid string) error {
func (r *restream) deleteProcess(tid TaskID) error {
task, ok := r.tasks[tid]
if !ok {
return ErrUnknownProcess
}
if task.process.Order != "stop" {
return fmt.Errorf("the process with the ID '%s' is still running", task.id)
return fmt.Errorf("the process with the ID '%s' is still running", tid)
}
r.unsetPlayoutPorts(task)
@@ -1228,17 +1196,11 @@ func (r *restream) deleteProcess(tid string) error {
return nil
}
func (r *restream) StartProcess(id, user, group string) error {
if !r.enforce(user, group, id, "COMMAND") {
return ErrForbidden
}
tid := newTaskid(id, group)
func (r *restream) StartProcess(id TaskID) error {
r.lock.Lock()
defer r.lock.Unlock()
err := r.startProcess(tid)
err := r.startProcess(id)
if err != nil {
return err
}
@@ -1248,7 +1210,7 @@ func (r *restream) StartProcess(id, user, group string) error {
return nil
}
func (r *restream) startProcess(tid string) error {
func (r *restream) startProcess(tid TaskID) error {
task, ok := r.tasks[tid]
if !ok {
return ErrUnknownProcess
@@ -1277,17 +1239,11 @@ func (r *restream) startProcess(tid string) error {
return nil
}
func (r *restream) StopProcess(id, user, group string) error {
if !r.enforce(user, group, id, "COMMAND") {
return ErrForbidden
}
tid := newTaskid(id, group)
func (r *restream) StopProcess(id TaskID) error {
r.lock.Lock()
defer r.lock.Unlock()
err := r.stopProcess(tid)
err := r.stopProcess(id)
if err != nil {
return err
}
@@ -1297,7 +1253,7 @@ func (r *restream) StopProcess(id, user, group string) error {
return nil
}
func (r *restream) stopProcess(tid string) error {
func (r *restream) stopProcess(tid TaskID) error {
task, ok := r.tasks[tid]
if !ok {
return ErrUnknownProcess
@@ -1322,20 +1278,14 @@ func (r *restream) stopProcess(tid string) error {
return nil
}
func (r *restream) RestartProcess(id, user, group string) error {
if !r.enforce(user, group, id, "COMMAND") {
return ErrForbidden
}
tid := newTaskid(id, group)
func (r *restream) RestartProcess(id TaskID) error {
r.lock.RLock()
defer r.lock.RUnlock()
return r.restartProcess(tid)
return r.restartProcess(id)
}
func (r *restream) restartProcess(tid string) error {
func (r *restream) restartProcess(tid TaskID) error {
task, ok := r.tasks[tid]
if !ok {
return ErrUnknownProcess
@@ -1354,17 +1304,11 @@ func (r *restream) restartProcess(tid string) error {
return nil
}
func (r *restream) ReloadProcess(id, user, group string) error {
if !r.enforce(user, group, id, "COMMAND") {
return ErrForbidden
}
tid := newTaskid(id, group)
func (r *restream) ReloadProcess(id TaskID) error {
r.lock.Lock()
defer r.lock.Unlock()
err := r.reloadProcess(tid)
err := r.reloadProcess(id)
if err != nil {
return err
}
@@ -1374,7 +1318,7 @@ func (r *restream) ReloadProcess(id, user, group string) error {
return nil
}
func (r *restream) reloadProcess(tid string) error {
func (r *restream) reloadProcess(tid TaskID) error {
t, ok := r.tasks[tid]
if !ok {
return ErrUnknownProcess
@@ -1409,7 +1353,7 @@ func (r *restream) reloadProcess(tid string) error {
r.stopProcess(tid)
}
t.parser = r.ffmpeg.NewProcessParser(t.logger, t.id, t.reference)
t.parser = r.ffmpeg.NewProcessParser(t.logger, t.String(), t.reference)
ffmpeg, err := r.ffmpeg.New(ffmpeg.ProcessConfig{
Reconnect: t.config.Reconnect,
@@ -1436,19 +1380,13 @@ func (r *restream) reloadProcess(tid string) error {
return nil
}
func (r *restream) GetProcessState(id, user, group string) (*app.State, error) {
func (r *restream) GetProcessState(id TaskID) (*app.State, error) {
state := &app.State{}
if !r.enforce(user, group, id, "GET") {
return state, ErrForbidden
}
tid := newTaskid(id, group)
r.lock.RLock()
defer r.lock.RUnlock()
task, ok := r.tasks[tid]
task, ok := r.tasks[id]
if !ok {
return state, ErrUnknownProcess
}
@@ -1505,19 +1443,13 @@ func (r *restream) GetProcessState(id, user, group string) (*app.State, error) {
return state, nil
}
func (r *restream) GetProcessLog(id, user, group string) (*app.Log, error) {
func (r *restream) GetProcessLog(id TaskID) (*app.Log, error) {
log := &app.Log{}
if !r.enforce(user, group, id, "GET") {
return log, ErrForbidden
}
tid := newTaskid(id, group)
r.lock.RLock()
defer r.lock.RUnlock()
task, ok := r.tasks[tid]
task, ok := r.tasks[id]
if !ok {
return log, ErrUnknownProcess
}
@@ -1560,23 +1492,16 @@ func (r *restream) GetProcessLog(id, user, group string) (*app.Log, error) {
return log, nil
}
func (r *restream) Probe(id, user, group string) app.Probe {
return r.ProbeWithTimeout(id, user, group, 20*time.Second)
func (r *restream) Probe(id TaskID) app.Probe {
return r.ProbeWithTimeout(id, 20*time.Second)
}
func (r *restream) ProbeWithTimeout(id, user, group string, timeout time.Duration) app.Probe {
func (r *restream) ProbeWithTimeout(id TaskID, timeout time.Duration) app.Probe {
appprobe := app.Probe{}
if !r.enforce(user, group, id, "PROBE") {
appprobe.Log = append(appprobe.Log, ErrForbidden.Error())
return appprobe
}
tid := newTaskid(id, group)
r.lock.RLock()
task, ok := r.tasks[tid]
task, ok := r.tasks[id]
if !ok {
appprobe.Log = append(appprobe.Log, fmt.Sprintf("Unknown process ID (%s)", id))
r.lock.RUnlock()
@@ -1640,17 +1565,11 @@ func (r *restream) ReloadSkills() error {
return r.ffmpeg.ReloadSkills()
}
func (r *restream) GetPlayout(id, user, group, inputid string) (string, error) {
if !r.enforce(user, group, id, "PLAYOUT") {
return "", ErrForbidden
}
tid := newTaskid(id, group)
func (r *restream) GetPlayout(id TaskID, inputid string) (string, error) {
r.lock.RLock()
defer r.lock.RUnlock()
task, ok := r.tasks[tid]
task, ok := r.tasks[id]
if !ok {
return "", ErrUnknownProcess
}
@@ -1669,21 +1588,15 @@ func (r *restream) GetPlayout(id, user, group, inputid string) (string, error) {
var ErrMetadataKeyNotFound = errors.New("unknown key")
func (r *restream) SetProcessMetadata(id, user, group, key string, data interface{}) error {
if !r.enforce(user, group, id, "METADATA") {
return ErrForbidden
}
func (r *restream) SetProcessMetadata(id TaskID, key string, data interface{}) error {
if len(key) == 0 {
return fmt.Errorf("a key for storing the data has to be provided")
}
tid := newTaskid(id, group)
r.lock.Lock()
defer r.lock.Unlock()
task, ok := r.tasks[tid]
task, ok := r.tasks[id]
if !ok {
return ErrUnknownProcess
}
@@ -1707,17 +1620,11 @@ func (r *restream) SetProcessMetadata(id, user, group, key string, data interfac
return nil
}
func (r *restream) GetProcessMetadata(id, user, group, key string) (interface{}, error) {
if !r.enforce(user, group, id, "METADATA") {
return nil, ErrForbidden
}
tid := newTaskid(id, group)
func (r *restream) GetProcessMetadata(id TaskID, key string) (interface{}, error) {
r.lock.RLock()
defer r.lock.RUnlock()
task, ok := r.tasks[tid]
task, ok := r.tasks[id]
if !ok {
return nil, ErrUnknownProcess
}
@@ -1784,7 +1691,7 @@ func resolvePlaceholders(config *app.Config, r replace.Replacer) {
"processid": config.ID,
"owner": config.Owner,
"reference": config.Reference,
"group": config.Group,
"group": config.Domain,
}
for i, option := range config.Options {

View File

@@ -116,16 +116,18 @@ func TestAddProcess(t *testing.T) {
process := getDummyProcess()
require.NotNil(t, process)
_, err = rs.GetProcess(process.ID, "", "")
tid := TaskID{ID: process.ID}
_, err = rs.GetProcess(tid)
require.Equal(t, ErrUnknownProcess, err)
err = rs.AddProcess(process)
require.Equal(t, nil, err, "Failed to add process (%s)", err)
_, err = rs.GetProcess(process.ID, "", "")
_, err = rs.GetProcess(tid)
require.Equal(t, nil, err, "Set process not found (%s)", process.ID)
state, _ := rs.GetProcessState(process.ID, "", "")
state, _ := rs.GetProcessState(tid)
require.Equal(t, "stop", state.Order, "Process should be stopped")
}
@@ -136,12 +138,14 @@ func TestAutostartProcess(t *testing.T) {
process := getDummyProcess()
process.Autostart = true
tid := TaskID{ID: process.ID}
rs.AddProcess(process)
state, _ := rs.GetProcessState(process.ID, "", "")
state, _ := rs.GetProcessState(tid)
require.Equal(t, "start", state.Order, "Process should be started")
rs.StopProcess(process.ID, "", "")
rs.StopProcess(tid)
}
func TestAddInvalidProcess(t *testing.T) {
@@ -217,14 +221,15 @@ func TestRemoveProcess(t *testing.T) {
require.NoError(t, err)
process := getDummyProcess()
tid := TaskID{ID: process.ID}
err = rs.AddProcess(process)
require.Equal(t, nil, err, "Failed to add process (%s)", err)
err = rs.DeleteProcess(process.ID, "", "")
err = rs.DeleteProcess(tid)
require.Equal(t, nil, err, "Set process not found (%s)", process.ID)
_, err = rs.GetProcess(process.ID, "", "")
_, err = rs.GetProcess(tid)
require.NotEqual(t, nil, err, "Unset process found (%s)", process.ID)
}
@@ -235,10 +240,12 @@ func TestUpdateProcess(t *testing.T) {
process1 := getDummyProcess()
require.NotNil(t, process1)
process1.ID = "process1"
tid1 := TaskID{ID: process1.ID}
process2 := getDummyProcess()
require.NotNil(t, process2)
process2.ID = "process2"
tid2 := TaskID{ID: process2.ID}
err = rs.AddProcess(process1)
require.Equal(t, nil, err)
@@ -246,7 +253,7 @@ func TestUpdateProcess(t *testing.T) {
err = rs.AddProcess(process2)
require.Equal(t, nil, err)
process, err := rs.GetProcess(process2.ID, "", "")
process, err := rs.GetProcess(tid2)
require.NoError(t, err)
createdAt := process.CreatedAt
@@ -257,18 +264,20 @@ func TestUpdateProcess(t *testing.T) {
process3 := getDummyProcess()
require.NotNil(t, process3)
process3.ID = "process2"
tid3 := TaskID{ID: process3.ID}
err = rs.UpdateProcess("process1", "", "", process3)
err = rs.UpdateProcess(tid1, process3)
require.Error(t, err)
process3.ID = "process3"
err = rs.UpdateProcess("process1", "", "", process3)
tid3.ID = process3.ID
err = rs.UpdateProcess(tid1, process3)
require.NoError(t, err)
_, err = rs.GetProcess(process1.ID, "", "")
_, err = rs.GetProcess(tid1)
require.Error(t, err)
process, err = rs.GetProcess(process3.ID, "", "")
process, err = rs.GetProcess(tid3)
require.NoError(t, err)
require.NotEqual(t, createdAt, process.CreatedAt) // this should be equal, but will require a major version jump
@@ -282,51 +291,64 @@ func TestGetProcess(t *testing.T) {
process1 := getDummyProcess()
process1.ID = "foo_aaa_1"
process1.Reference = "foo_aaa_1"
tid1 := TaskID{ID: process1.ID}
process2 := getDummyProcess()
process2.ID = "bar_bbb_2"
process2.Reference = "bar_bbb_2"
tid2 := TaskID{ID: process2.ID}
process3 := getDummyProcess()
process3.ID = "foo_ccc_3"
process3.Reference = "foo_ccc_3"
tid3 := TaskID{ID: process3.ID}
process4 := getDummyProcess()
process4.ID = "bar_ddd_4"
process4.Reference = "bar_ddd_4"
tid4 := TaskID{ID: process4.ID}
rs.AddProcess(process1)
rs.AddProcess(process2)
rs.AddProcess(process3)
rs.AddProcess(process4)
_, err = rs.GetProcess(process1.ID, "", "")
_, err = rs.GetProcess(tid1)
require.Equal(t, nil, err)
_, err = rs.GetProcess(tid2)
require.Equal(t, nil, err)
_, err = rs.GetProcess(tid3)
require.Equal(t, nil, err)
_, err = rs.GetProcess(tid4)
require.Equal(t, nil, err)
list := rs.GetProcessIDs("", "", "", "")
require.Len(t, list, 4)
require.ElementsMatch(t, []string{"foo_aaa_1", "bar_bbb_2", "foo_ccc_3", "bar_ddd_4"}, list)
require.ElementsMatch(t, []TaskID{{ID: "foo_aaa_1"}, {ID: "bar_bbb_2"}, {ID: "foo_ccc_3"}, {ID: "bar_ddd_4"}}, list)
list = rs.GetProcessIDs("foo_*", "", "", "")
require.Len(t, list, 2)
require.ElementsMatch(t, []string{"foo_aaa_1", "foo_ccc_3"}, list)
require.ElementsMatch(t, []TaskID{{ID: "foo_aaa_1"}, {ID: "foo_ccc_3"}}, list)
list = rs.GetProcessIDs("bar_*", "", "", "")
require.Len(t, list, 2)
require.ElementsMatch(t, []string{"bar_bbb_2", "bar_ddd_4"}, list)
require.ElementsMatch(t, []TaskID{{ID: "bar_bbb_2"}, {ID: "bar_ddd_4"}}, list)
list = rs.GetProcessIDs("*_bbb_*", "", "", "")
require.Len(t, list, 1)
require.ElementsMatch(t, []string{"bar_bbb_2"}, list)
require.ElementsMatch(t, []TaskID{{ID: "bar_bbb_2"}}, list)
list = rs.GetProcessIDs("", "foo_*", "", "")
require.Len(t, list, 2)
require.ElementsMatch(t, []string{"foo_aaa_1", "foo_ccc_3"}, list)
require.ElementsMatch(t, []TaskID{{ID: "foo_aaa_1"}, {ID: "foo_ccc_3"}}, list)
list = rs.GetProcessIDs("", "bar_*", "", "")
require.Len(t, list, 2)
require.ElementsMatch(t, []string{"bar_bbb_2", "bar_ddd_4"}, list)
require.ElementsMatch(t, []TaskID{{ID: "bar_bbb_2"}, {ID: "bar_ddd_4"}}, list)
list = rs.GetProcessIDs("", "*_bbb_*", "", "")
require.Len(t, list, 1)
require.ElementsMatch(t, []string{"bar_bbb_2"}, list)
require.ElementsMatch(t, []TaskID{{ID: "bar_bbb_2"}}, list)
}
func TestStartProcess(t *testing.T) {
@@ -334,25 +356,26 @@ func TestStartProcess(t *testing.T) {
require.NoError(t, err)
process := getDummyProcess()
tid := TaskID{ID: process.ID}
rs.AddProcess(process)
err = rs.StartProcess("foobar", "", "")
err = rs.StartProcess(TaskID{ID: "foobar"})
require.NotEqual(t, nil, err, "shouldn't be able to start non-existing process")
err = rs.StartProcess(process.ID, "", "")
err = rs.StartProcess(tid)
require.Equal(t, nil, err, "should be able to start existing process")
state, _ := rs.GetProcessState(process.ID, "", "")
state, _ := rs.GetProcessState(tid)
require.Equal(t, "start", state.Order, "Process should be started")
err = rs.StartProcess(process.ID, "", "")
err = rs.StartProcess(tid)
require.Equal(t, nil, err, "should be able to start already running process")
state, _ = rs.GetProcessState(process.ID, "", "")
state, _ = rs.GetProcessState(tid)
require.Equal(t, "start", state.Order, "Process should be started")
rs.StopProcess(process.ID, "", "")
rs.StopProcess(tid)
}
func TestStopProcess(t *testing.T) {
@@ -360,23 +383,24 @@ func TestStopProcess(t *testing.T) {
require.NoError(t, err)
process := getDummyProcess()
tid := TaskID{ID: process.ID}
rs.AddProcess(process)
rs.StartProcess(process.ID, "", "")
rs.StartProcess(tid)
err = rs.StopProcess("foobar", "", "")
err = rs.StopProcess(TaskID{ID: "foobar"})
require.NotEqual(t, nil, err, "shouldn't be able to stop non-existing process")
err = rs.StopProcess(process.ID, "", "")
err = rs.StopProcess(tid)
require.Equal(t, nil, err, "should be able to stop existing running process")
state, _ := rs.GetProcessState(process.ID, "", "")
state, _ := rs.GetProcessState(tid)
require.Equal(t, "stop", state.Order, "Process should be stopped")
err = rs.StopProcess(process.ID, "", "")
err = rs.StopProcess(tid)
require.Equal(t, nil, err, "should be able to stop already stopped process")
state, _ = rs.GetProcessState(process.ID, "", "")
state, _ = rs.GetProcessState(tid)
require.Equal(t, "stop", state.Order, "Process should be stopped")
}
@@ -385,24 +409,25 @@ func TestRestartProcess(t *testing.T) {
require.NoError(t, err)
process := getDummyProcess()
tid := TaskID{ID: process.ID}
rs.AddProcess(process)
err = rs.RestartProcess("foobar", "", "")
err = rs.RestartProcess(TaskID{ID: "foobar"})
require.NotEqual(t, nil, err, "shouldn't be able to restart non-existing process")
err = rs.RestartProcess(process.ID, "", "")
err = rs.RestartProcess(tid)
require.Equal(t, nil, err, "should be able to restart existing stopped process")
state, _ := rs.GetProcessState(process.ID, "", "")
state, _ := rs.GetProcessState(tid)
require.Equal(t, "stop", state.Order, "Process should be stopped")
rs.StartProcess(process.ID, "", "")
rs.StartProcess(tid)
state, _ = rs.GetProcessState(process.ID, "", "")
state, _ = rs.GetProcessState(tid)
require.Equal(t, "start", state.Order, "Process should be started")
rs.StopProcess(process.ID, "", "")
rs.StopProcess(tid)
}
func TestReloadProcess(t *testing.T) {
@@ -410,30 +435,31 @@ func TestReloadProcess(t *testing.T) {
require.NoError(t, err)
process := getDummyProcess()
tid := TaskID{ID: process.ID}
rs.AddProcess(process)
err = rs.ReloadProcess("foobar", "", "")
err = rs.ReloadProcess(TaskID{ID: "foobar"})
require.NotEqual(t, nil, err, "shouldn't be able to reload non-existing process")
err = rs.ReloadProcess(process.ID, "", "")
err = rs.ReloadProcess(tid)
require.Equal(t, nil, err, "should be able to reload existing stopped process")
state, _ := rs.GetProcessState(process.ID, "", "")
state, _ := rs.GetProcessState(tid)
require.Equal(t, "stop", state.Order, "Process should be stopped")
rs.StartProcess(process.ID, "", "")
rs.StartProcess(tid)
state, _ = rs.GetProcessState(process.ID, "", "")
state, _ = rs.GetProcessState(tid)
require.Equal(t, "start", state.Order, "Process should be started")
err = rs.ReloadProcess(process.ID, "", "")
err = rs.ReloadProcess(tid)
require.Equal(t, nil, err, "should be able to reload existing process")
state, _ = rs.GetProcessState(process.ID, "", "")
state, _ = rs.GetProcessState(tid)
require.Equal(t, "start", state.Order, "Process should be started")
rs.StopProcess(process.ID, "", "")
rs.StopProcess(tid)
}
func TestProbeProcess(t *testing.T) {
@@ -441,10 +467,11 @@ func TestProbeProcess(t *testing.T) {
require.NoError(t, err)
process := getDummyProcess()
tid := TaskID{ID: process.ID}
rs.AddProcess(process)
probe := rs.ProbeWithTimeout(process.ID, "", "", 5*time.Second)
probe := rs.ProbeWithTimeout(tid, 5*time.Second)
require.Equal(t, 3, len(probe.Streams))
}
@@ -454,17 +481,18 @@ func TestProcessMetadata(t *testing.T) {
require.NoError(t, err)
process := getDummyProcess()
tid := TaskID{ID: process.ID}
rs.AddProcess(process)
data, err := rs.GetProcessMetadata(process.ID, "", "", "foobar")
data, err := rs.GetProcessMetadata(tid, "foobar")
require.Equal(t, ErrMetadataKeyNotFound, err)
require.Equal(t, nil, data, "nothing should be stored under the key")
err = rs.SetProcessMetadata(process.ID, "", "", "foobar", process)
err = rs.SetProcessMetadata(tid, "foobar", process)
require.NoError(t, err)
data, err = rs.GetProcessMetadata(process.ID, "", "", "foobar")
data, err = rs.GetProcessMetadata(tid, "foobar")
require.NoError(t, err)
require.NotEqual(t, nil, data, "there should be something stored under the key")
@@ -478,29 +506,30 @@ func TestLog(t *testing.T) {
require.NoError(t, err)
process := getDummyProcess()
tid := TaskID{ID: process.ID}
rs.AddProcess(process)
_, err = rs.GetProcessLog("foobar", "", "")
_, err = rs.GetProcessLog(TaskID{ID: "foobar"})
require.Error(t, err)
log, err := rs.GetProcessLog(process.ID, "", "")
log, err := rs.GetProcessLog(tid)
require.NoError(t, err)
require.Equal(t, 0, len(log.Prelude))
require.Equal(t, 0, len(log.Log))
rs.StartProcess(process.ID, "", "")
rs.StartProcess(tid)
time.Sleep(3 * time.Second)
log, _ = rs.GetProcessLog(process.ID, "", "")
log, _ = rs.GetProcessLog(tid)
require.NotEqual(t, 0, len(log.Prelude))
require.NotEqual(t, 0, len(log.Log))
rs.StopProcess(process.ID, "", "")
rs.StopProcess(tid)
log, _ = rs.GetProcessLog(process.ID, "", "")
log, _ = rs.GetProcessLog(tid)
require.NotEqual(t, 0, len(log.Prelude))
require.NotEqual(t, 0, len(log.Log))
@@ -511,25 +540,26 @@ func TestLogTransfer(t *testing.T) {
require.NoError(t, err)
process := getDummyProcess()
tid := TaskID{ID: process.ID}
err = rs.AddProcess(process)
require.NoError(t, err)
rs.StartProcess(process.ID, "", "")
rs.StartProcess(tid)
time.Sleep(3 * time.Second)
rs.StopProcess(process.ID, "", "")
rs.StopProcess(tid)
rs.StartProcess(process.ID, "", "")
rs.StopProcess(process.ID, "", "")
rs.StartProcess(tid)
rs.StopProcess(tid)
log, _ := rs.GetProcessLog(process.ID, "", "")
log, _ := rs.GetProcessLog(tid)
require.Equal(t, 1, len(log.History))
err = rs.UpdateProcess(process.ID, "", "", process)
err = rs.UpdateProcess(tid, process)
require.NoError(t, err)
log, _ = rs.GetProcessLog(process.ID, "", "")
log, _ = rs.GetProcessLog(tid)
require.Equal(t, 1, len(log.History))
}
@@ -539,18 +569,19 @@ func TestPlayoutNoRange(t *testing.T) {
require.NoError(t, err)
process := getDummyProcess()
tid := TaskID{ID: process.ID}
process.Input[0].Address = "playout:" + process.Input[0].Address
rs.AddProcess(process)
_, err = rs.GetPlayout("foobar", "", "", process.Input[0].ID)
_, err = rs.GetPlayout(TaskID{ID: "foobar"}, process.Input[0].ID)
require.Equal(t, ErrUnknownProcess, err)
_, err = rs.GetPlayout(process.ID, "", "", "foobar")
_, err = rs.GetPlayout(tid, "foobar")
require.NotEqual(t, nil, err, "playout of non-existing input should error")
addr, _ := rs.GetPlayout(process.ID, "", "", process.Input[0].ID)
addr, _ := rs.GetPlayout(tid, process.Input[0].ID)
require.Equal(t, 0, len(addr), "the playout address should be empty if no port range is given")
}
@@ -562,18 +593,19 @@ func TestPlayoutRange(t *testing.T) {
require.NoError(t, err)
process := getDummyProcess()
tid := TaskID{ID: process.ID}
process.Input[0].Address = "playout:" + process.Input[0].Address
rs.AddProcess(process)
_, err = rs.GetPlayout("foobar", "", "", process.Input[0].ID)
_, err = rs.GetPlayout(TaskID{ID: "foobar"}, process.Input[0].ID)
require.Equal(t, ErrUnknownProcess, err)
_, err = rs.GetPlayout(process.ID, "", "", "foobar")
_, err = rs.GetPlayout(tid, "foobar")
require.NotEqual(t, nil, err, "playout of non-existing input should error")
addr, _ := rs.GetPlayout(process.ID, "", "", process.Input[0].ID)
addr, _ := rs.GetPlayout(tid, process.Input[0].ID)
require.NotEqual(t, 0, len(addr), "the playout address should not be empty if a port range is given")
require.Equal(t, "127.0.0.1:3000", addr, "the playout address should be 127.0.0.1:3000")
}
@@ -675,9 +707,9 @@ func TestTeeAddressReference(t *testing.T) {
r := rs.(*restream)
require.Equal(t, "http://example.com/live.m3u8", r.tasks["process2~"].config.Input[0].Address)
require.Equal(t, "http://example.com/live.m3u8", r.tasks["process3~"].config.Input[0].Address)
require.Equal(t, "rtmp://example.com/live.stream?token=123", r.tasks["process4~"].config.Input[0].Address)
require.Equal(t, "http://example.com/live.m3u8", r.tasks[TaskID{ID: "process2"}].config.Input[0].Address)
require.Equal(t, "http://example.com/live.m3u8", r.tasks[TaskID{ID: "process3"}].config.Input[0].Address)
require.Equal(t, "rtmp://example.com/live.stream?token=123", r.tasks[TaskID{ID: "process4"}].config.Input[0].Address)
}
func TestConfigValidation(t *testing.T) {
@@ -984,7 +1016,7 @@ func TestReplacer(t *testing.T) {
StaleTimeout: 0,
}
task, ok := rs.tasks["314159265359~"]
task, ok := rs.tasks[TaskID{ID: "314159265359"}]
require.True(t, ok)
require.Equal(t, process, task.config)
@@ -1004,7 +1036,7 @@ func TestProcessLimit(t *testing.T) {
rs := rsi.(*restream)
task, ok := rs.tasks[process.ID+"~"]
task, ok := rs.tasks[TaskID{ID: process.ID}]
require.True(t, ok)
status := task.ffmpeg.Status()

View File

@@ -1,49 +0,0 @@
package store
import (
"github.com/datarhei/core/v16/restream/app"
)
type StoreData struct {
Version uint64 `json:"version"`
Process map[string]*app.Process `json:"process"`
Metadata struct {
System map[string]interface{} `json:"system"`
Process map[string]map[string]interface{} `json:"process"`
} `json:"metadata"`
}
func NewStoreData() StoreData {
c := StoreData{
Version: version,
}
c.Process = make(map[string]*app.Process)
c.Metadata.System = make(map[string]interface{})
c.Metadata.Process = make(map[string]map[string]interface{})
return c
}
func (c *StoreData) IsEmpty() bool {
if len(c.Process) != 0 {
return false
}
if len(c.Metadata.Process) != 0 {
return false
}
if len(c.Metadata.System) != 0 {
return false
}
return true
}
func (c *StoreData) sanitize() {
if c.Process == nil {
c.Process = make(map[string]*app.Process)
}
}

View File

@@ -1,157 +0,0 @@
package store
import (
gojson "encoding/json"
"fmt"
"os"
"sync"
"github.com/datarhei/core/v16/encoding/json"
"github.com/datarhei/core/v16/io/fs"
"github.com/datarhei/core/v16/log"
)
type JSONConfig struct {
Filesystem fs.Filesystem
Filepath string // Full path to the database file
Logger log.Logger
}
type jsonStore struct {
fs fs.Filesystem
filepath string
logger log.Logger
// Mutex to serialize access to the backend
lock sync.RWMutex
}
// version 4 -> 5:
// process groups have been added. the indices for the maps are only the process IDs in version 4.
// version 5 adds the group name as suffix to the process ID with a "~".
var version uint64 = 5
func NewJSON(config JSONConfig) (Store, error) {
s := &jsonStore{
fs: config.Filesystem,
filepath: config.Filepath,
logger: config.Logger,
}
if len(s.filepath) == 0 {
s.filepath = "/db.json"
}
if s.fs == nil {
return nil, fmt.Errorf("no valid filesystem provided")
}
if s.logger == nil {
s.logger = log.New("")
}
return s, nil
}
func (s *jsonStore) Load() (StoreData, error) {
s.lock.Lock()
defer s.lock.Unlock()
data, err := s.load(s.filepath, version)
if err != nil {
return NewStoreData(), err
}
data.sanitize()
return data, nil
}
func (s *jsonStore) Store(data StoreData) error {
if data.Version != version {
return fmt.Errorf("invalid version (have: %d, want: %d)", data.Version, version)
}
s.lock.RLock()
defer s.lock.RUnlock()
err := s.store(s.filepath, data)
if err != nil {
return fmt.Errorf("failed to store data: %w", err)
}
return nil
}
func (s *jsonStore) store(filepath string, data StoreData) error {
jsondata, err := gojson.MarshalIndent(&data, "", " ")
if err != nil {
return err
}
_, _, err = s.fs.WriteFileSafe(filepath, jsondata)
if err != nil {
return err
}
s.logger.WithField("file", filepath).Debug().Log("Stored data")
return nil
}
type storeVersion struct {
Version uint64 `json:"version"`
}
func (s *jsonStore) load(filepath string, version uint64) (StoreData, error) {
r := NewStoreData()
_, err := s.fs.Stat(filepath)
if err != nil {
if os.IsNotExist(err) {
return r, nil
}
return r, err
}
jsondata, err := s.fs.ReadFile(filepath)
if err != nil {
return r, err
}
var db storeVersion
if err = gojson.Unmarshal(jsondata, &db); err != nil {
return r, json.FormatError(jsondata, err)
}
if db.Version == 4 {
rold := NewStoreData()
if err = gojson.Unmarshal(jsondata, &rold); err != nil {
return r, json.FormatError(jsondata, err)
}
for id, p := range rold.Process {
r.Process[id+"~"] = p
}
for key, p := range rold.Metadata.System {
r.Metadata.System[key] = p
}
for id, p := range rold.Metadata.Process {
r.Metadata.Process[id+"~"] = p
}
} else if db.Version == version {
if err = gojson.Unmarshal(jsondata, &r); err != nil {
return r, json.FormatError(jsondata, err)
}
} else {
return r, fmt.Errorf("unsupported version of the DB file (want: %d, have: %d)", version, db.Version)
}
s.logger.WithField("file", filepath).Debug().Log("Read data")
return r, nil
}

248
restream/store/json/data.go Normal file
View File

@@ -0,0 +1,248 @@
package json
import "github.com/datarhei/core/v16/restream/app"
type ProcessConfigIOCleanup struct {
Pattern string `json:"pattern"`
MaxFiles uint `json:"max_files"`
MaxFileAge uint `json:"max_file_age_seconds"`
PurgeOnDelete bool `json:"purge_on_delete"`
}
func (p *ProcessConfigIOCleanup) Marshal(a *app.ConfigIOCleanup) {
p.Pattern = a.Pattern
p.MaxFiles = a.MaxFiles
p.MaxFileAge = a.MaxFileAge
p.PurgeOnDelete = a.PurgeOnDelete
}
func (p *ProcessConfigIOCleanup) Unmarshal() app.ConfigIOCleanup {
a := app.ConfigIOCleanup{
Pattern: p.Pattern,
MaxFiles: p.MaxFiles,
MaxFileAge: p.MaxFileAge,
PurgeOnDelete: p.PurgeOnDelete,
}
return a
}
type ProcessConfigIO struct {
ID string `json:"id"`
Address string `json:"address"`
Options []string `json:"options"`
Cleanup []ProcessConfigIOCleanup `json:"cleanup"`
}
func (p *ProcessConfigIO) Marshal(a *app.ConfigIO) {
p.ID = a.ID
p.Address = a.Address
p.Options = make([]string, len(a.Options))
copy(p.Options, a.Options)
if len(a.Cleanup) != 0 {
p.Cleanup = make([]ProcessConfigIOCleanup, len(a.Cleanup))
for x, cleanup := range a.Cleanup {
p.Cleanup[x].Marshal(&cleanup)
}
} else {
p.Cleanup = nil
}
}
func (p *ProcessConfigIO) Unmarshal() app.ConfigIO {
a := app.ConfigIO{
ID: p.ID,
Address: p.Address,
}
a.Options = make([]string, len(p.Options))
copy(a.Options, p.Options)
if len(p.Cleanup) != 0 {
a.Cleanup = make([]app.ConfigIOCleanup, len(p.Cleanup))
for x, cleanup := range p.Cleanup {
a.Cleanup[x] = cleanup.Unmarshal()
}
}
return a
}
type ProcessConfig struct {
ID string `json:"id"`
Reference string `json:"reference"`
Owner string `json:"owner"`
Domain string `json:"domain"`
FFVersion string `json:"ffversion"`
Input []ProcessConfigIO `json:"input"`
Output []ProcessConfigIO `json:"output"`
Options []string `json:"options"`
Reconnect bool `json:"reconnect"`
ReconnectDelay uint64 `json:"reconnect_delay_seconds"` // seconds
Autostart bool `json:"autostart"`
StaleTimeout uint64 `json:"stale_timeout_seconds"` // seconds
LimitCPU float64 `json:"limit_cpu_usage"` // percent
LimitMemory uint64 `json:"limit_memory_bytes"` // bytes
LimitWaitFor uint64 `json:"limit_waitfor_seconds"` // seconds
}
func (p *ProcessConfig) Marshal(a *app.Config) {
p.ID = a.ID
p.Reference = a.Reference
p.Owner = a.Owner
p.Domain = a.Domain
p.FFVersion = a.FFVersion
p.Reconnect = a.Reconnect
p.ReconnectDelay = a.ReconnectDelay
p.Autostart = a.Autostart
p.StaleTimeout = a.StaleTimeout
p.LimitCPU = a.LimitCPU
p.LimitMemory = a.LimitMemory
p.LimitWaitFor = a.LimitWaitFor
p.Options = make([]string, len(a.Options))
copy(p.Options, a.Options)
p.Input = make([]ProcessConfigIO, len(a.Input))
for x, input := range a.Input {
p.Input[x].Marshal(&input)
}
p.Output = make([]ProcessConfigIO, len(a.Output))
for x, output := range a.Output {
p.Output[x].Marshal(&output)
}
}
func (p *ProcessConfig) Unmarshal() *app.Config {
a := &app.Config{
ID: p.ID,
Reference: p.Reference,
Owner: p.Owner,
Domain: p.Domain,
FFVersion: p.FFVersion,
Input: []app.ConfigIO{},
Output: []app.ConfigIO{},
Options: []string{},
Reconnect: p.Reconnect,
ReconnectDelay: p.ReconnectDelay,
Autostart: p.Autostart,
StaleTimeout: p.StaleTimeout,
LimitCPU: p.LimitCPU,
LimitMemory: p.LimitMemory,
LimitWaitFor: p.LimitWaitFor,
}
a.Options = make([]string, len(p.Options))
copy(a.Options, p.Options)
a.Input = make([]app.ConfigIO, len(p.Input))
for x, input := range p.Input {
a.Input[x] = input.Unmarshal()
}
a.Output = make([]app.ConfigIO, len(p.Output))
for x, output := range p.Output {
a.Output[x] = output.Unmarshal()
}
return a
}
type Process struct {
ID string `json:"id"`
Owner string `json:"owner"`
Domain string `json:"domain"`
Reference string `json:"reference"`
Config ProcessConfig `json:"config"`
CreatedAt int64 `json:"created_at"`
UpdatedAt int64 `json:"updated_at"`
Order string `json:"order"`
}
func MarshalProcess(a *app.Process) Process {
p := Process{
ID: a.ID,
Owner: a.Owner,
Domain: a.Domain,
Reference: a.Reference,
Config: ProcessConfig{},
CreatedAt: a.CreatedAt,
UpdatedAt: a.UpdatedAt,
Order: a.Order,
}
p.Config.Marshal(a.Config)
return p
}
func UnmarshalProcess(p Process) *app.Process {
a := &app.Process{
ID: p.ID,
Owner: p.Owner,
Domain: p.Domain,
Reference: p.Reference,
Config: &app.Config{},
CreatedAt: p.CreatedAt,
UpdatedAt: p.UpdatedAt,
Order: p.Order,
}
a.Config = p.Config.Unmarshal()
return a
}
type Domain struct {
Process map[string]Process `json:"process"`
Metadata map[string]map[string]interface{} `json:"metadata"`
}
type Data struct {
Version uint64 `json:"version"`
Process map[string]Process `json:"process"`
Domain map[string]Domain `json:"domain"`
Metadata struct {
System map[string]interface{} `json:"system"`
Process map[string]map[string]interface{} `json:"process"`
} `json:"metadata"`
}
var version uint64 = 4
func NewData() Data {
c := Data{
Version: version,
}
c.Process = make(map[string]Process)
c.Domain = make(map[string]Domain)
c.Metadata.System = make(map[string]interface{})
c.Metadata.Process = make(map[string]map[string]interface{})
return c
}
func (c *Data) IsEmpty() bool {
if len(c.Process) != 0 {
return false
}
if len(c.Domain) != 0 {
return false
}
if len(c.Metadata.Process) != 0 {
return false
}
if len(c.Metadata.System) != 0 {
return false
}
return true
}

214
restream/store/json/json.go Normal file
View File

@@ -0,0 +1,214 @@
package json
import (
gojson "encoding/json"
"fmt"
"os"
"sync"
"github.com/datarhei/core/v16/encoding/json"
"github.com/datarhei/core/v16/io/fs"
"github.com/datarhei/core/v16/log"
"github.com/datarhei/core/v16/restream/store"
)
type Config struct {
Filesystem fs.Filesystem
Filepath string // Full path to the database file
Logger log.Logger
}
type jsonStore struct {
fs fs.Filesystem
filepath string
logger log.Logger
// Mutex to serialize access to the disk
lock sync.RWMutex
}
func New(config Config) (store.Store, error) {
s := &jsonStore{
fs: config.Filesystem,
filepath: config.Filepath,
logger: config.Logger,
}
if len(s.filepath) == 0 {
s.filepath = "/db.json"
}
if s.fs == nil {
return nil, fmt.Errorf("no valid filesystem provided")
}
if s.logger == nil {
s.logger = log.New("")
}
return s, nil
}
func (s *jsonStore) Load() (store.Data, error) {
s.lock.Lock()
defer s.lock.Unlock()
data := store.NewData()
d, err := s.load(s.filepath, version)
if err != nil {
return data, err
}
for _, process := range d.Process {
if data.Process[""] == nil {
data.Process[""] = map[string]store.Process{}
}
p := data.Process[""][process.ID]
p.Process = UnmarshalProcess(process)
p.Metadata = map[string]interface{}{}
data.Process[""][process.ID] = p
}
for pid, m := range d.Metadata.Process {
if data.Process[""] == nil {
data.Process[""] = map[string]store.Process{}
}
p := data.Process[""][pid]
p.Metadata = m
data.Process[""][pid] = p
}
for k, v := range d.Metadata.System {
data.Metadata[k] = v
}
for name, domain := range d.Domain {
if data.Process[name] == nil {
data.Process[name] = map[string]store.Process{}
}
for pid, process := range domain.Process {
p := data.Process[name][pid]
p.Process = UnmarshalProcess(process)
p.Metadata = map[string]interface{}{}
data.Process[name][pid] = p
}
for pid, m := range domain.Metadata {
p := data.Process[name][pid]
p.Metadata = m
data.Process[name][pid] = p
}
}
return data, nil
}
func (s *jsonStore) Store(data store.Data) error {
r := NewData()
for k, v := range data.Metadata {
r.Metadata.System[k] = v
}
for domain, d := range data.Process {
for pid, p := range d {
if len(domain) == 0 {
r.Process[pid] = MarshalProcess(p.Process)
r.Metadata.Process[pid] = p.Metadata
} else {
x := r.Domain[domain]
if x.Process == nil {
x.Process = map[string]Process{}
}
x.Process[pid] = MarshalProcess(p.Process)
if x.Metadata == nil {
x.Metadata = map[string]map[string]interface{}{}
}
x.Metadata[pid] = p.Metadata
r.Domain[domain] = x
}
}
}
s.lock.RLock()
defer s.lock.RUnlock()
err := s.store(s.filepath, r)
if err != nil {
return fmt.Errorf("failed to store data: %w", err)
}
return nil
}
func (s *jsonStore) store(filepath string, data Data) error {
if data.Version != version {
return fmt.Errorf("invalid version (have: %d, want: %d)", data.Version, version)
}
jsondata, err := gojson.MarshalIndent(&data, "", " ")
if err != nil {
return err
}
_, _, err = s.fs.WriteFileSafe(filepath, jsondata)
if err != nil {
return err
}
s.logger.WithField("file", filepath).Debug().Log("Stored data")
return nil
}
type storeVersion struct {
Version uint64 `json:"version"`
}
func (s *jsonStore) load(filepath string, version uint64) (Data, error) {
r := NewData()
_, err := s.fs.Stat(filepath)
if err != nil {
if os.IsNotExist(err) {
return r, nil
}
return r, err
}
jsondata, err := s.fs.ReadFile(filepath)
if err != nil {
return r, err
}
var db storeVersion
if err = gojson.Unmarshal(jsondata, &db); err != nil {
return r, json.FormatError(jsondata, err)
}
if db.Version == version {
if err = gojson.Unmarshal(jsondata, &r); err != nil {
return r, json.FormatError(jsondata, err)
}
} else {
return r, fmt.Errorf("unsupported version of the DB file (want: %d, have: %d)", version, db.Version)
}
s.logger.WithField("file", filepath).Debug().Log("Read data")
return r, nil
}

View File

@@ -0,0 +1,184 @@
package json
import (
"testing"
"github.com/datarhei/core/v16/io/fs"
"github.com/datarhei/core/v16/restream/app"
"github.com/datarhei/core/v16/restream/store"
"github.com/stretchr/testify/require"
)
func getFS(t *testing.T) fs.Filesystem {
fs, err := fs.NewRootedDiskFilesystem(fs.RootedDiskConfig{
Root: ".",
})
require.NoError(t, err)
info, err := fs.Stat("./fixtures/v4_empty.json")
require.NoError(t, err)
require.Equal(t, "/fixtures/v4_empty.json", info.Name())
return fs
}
func TestNew(t *testing.T) {
store, err := New(Config{
Filesystem: getFS(t),
})
require.NoError(t, err)
require.NotEmpty(t, store)
}
func TestStoreLoad(t *testing.T) {
memfs, err := fs.NewMemFilesystem(fs.MemConfig{})
require.NoError(t, err)
jsonstore, err := New(Config{
Filesystem: memfs,
Filepath: "./db.json",
})
require.NoError(t, err)
data := store.NewData()
data.Process[""] = make(map[string]store.Process)
p := store.Process{
Process: &app.Process{
ID: "foobar",
Owner: "me",
Domain: "",
Reference: "ref",
Config: &app.Config{
ID: "foobar",
Reference: "ref",
Owner: "me",
Domain: "",
FFVersion: "5.1.3",
Input: []app.ConfigIO{},
Output: []app.ConfigIO{},
Options: []string{
"42",
},
Reconnect: true,
ReconnectDelay: 14,
Autostart: true,
StaleTimeout: 1,
LimitCPU: 2,
LimitMemory: 3,
LimitWaitFor: 4,
},
CreatedAt: 0,
UpdatedAt: 0,
Order: "stop",
},
Metadata: map[string]interface{}{
"some": "data",
},
}
data.Process[""]["foobar"] = p
data.Process["domain"] = make(map[string]store.Process)
p = store.Process{
Process: &app.Process{
ID: "foobaz",
Owner: "you",
Domain: "domain",
Reference: "refref",
Config: &app.Config{
ID: "foobaz",
Reference: "refref",
Owner: "you",
Domain: "domain",
FFVersion: "5.1.4",
Input: []app.ConfigIO{},
Output: []app.ConfigIO{},
Options: []string{
"47",
},
Reconnect: true,
ReconnectDelay: 24,
Autostart: true,
StaleTimeout: 21,
LimitCPU: 22,
LimitMemory: 23,
LimitWaitFor: 24,
},
CreatedAt: 0,
UpdatedAt: 0,
Order: "stop",
},
Metadata: map[string]interface{}{
"some-more": "data",
},
}
data.Process["domain"]["foobaz"] = p
data.Metadata["foo"] = "bar"
err = jsonstore.Store(data)
require.NoError(t, err)
d, err := jsonstore.Load()
require.NoError(t, err)
require.Equal(t, data, d)
}
func TestLoad(t *testing.T) {
store, err := New(Config{
Filesystem: getFS(t),
Filepath: "./fixtures/v4_empty.json",
})
require.NoError(t, err)
_, err = store.Load()
require.NoError(t, err)
}
func TestLoadFailed(t *testing.T) {
store, err := New(Config{
Filesystem: getFS(t),
Filepath: "./fixtures/v4_invalid.json",
})
require.NoError(t, err)
_, err = store.Load()
require.Error(t, err)
}
func TestIsEmpty(t *testing.T) {
store, err := New(Config{
Filesystem: getFS(t),
Filepath: "./fixtures/v4_empty.json",
})
require.NoError(t, err)
data, err := store.Load()
require.NoError(t, err)
require.Equal(t, true, len(data.Process) == 0)
}
func TestNotExists(t *testing.T) {
store, err := New(Config{
Filesystem: getFS(t),
Filepath: "./fixtures/v4_notexist.json",
})
require.NoError(t, err)
data, err := store.Load()
require.NoError(t, err)
require.Equal(t, true, len(data.Process) == 0)
}
func TestInvalidVersion(t *testing.T) {
store, err := New(Config{
Filesystem: getFS(t),
Filepath: "./fixtures/v3_empty.json",
})
require.NoError(t, err)
data, err := store.Load()
require.Error(t, err)
require.Equal(t, true, len(data.Process) == 0)
}

View File

@@ -1,113 +0,0 @@
package store
import (
"testing"
"github.com/datarhei/core/v16/io/fs"
"github.com/stretchr/testify/require"
)
func getFS(t *testing.T) fs.Filesystem {
fs, err := fs.NewRootedDiskFilesystem(fs.RootedDiskConfig{
Root: ".",
})
require.NoError(t, err)
info, err := fs.Stat("./fixtures/v4_empty.json")
require.NoError(t, err)
require.Equal(t, "/fixtures/v4_empty.json", info.Name())
return fs
}
func TestNew(t *testing.T) {
store, err := NewJSON(JSONConfig{
Filesystem: getFS(t),
})
require.NoError(t, err)
require.NotEmpty(t, store)
}
func TestLoad(t *testing.T) {
store, err := NewJSON(JSONConfig{
Filesystem: getFS(t),
Filepath: "./fixtures/v4_empty.json",
})
require.NoError(t, err)
_, err = store.Load()
require.NoError(t, err)
}
func TestLoadFailed(t *testing.T) {
store, err := NewJSON(JSONConfig{
Filesystem: getFS(t),
Filepath: "./fixtures/v4_invalid.json",
})
require.NoError(t, err)
_, err = store.Load()
require.Error(t, err)
}
func TestIsEmpty(t *testing.T) {
store, err := NewJSON(JSONConfig{
Filesystem: getFS(t),
Filepath: "./fixtures/v4_empty.json",
})
require.NoError(t, err)
data, err := store.Load()
require.NoError(t, err)
require.Equal(t, true, data.IsEmpty())
}
func TestNotExists(t *testing.T) {
store, err := NewJSON(JSONConfig{
Filesystem: getFS(t),
Filepath: "./fixtures/v4_notexist.json",
})
require.NoError(t, err)
data, err := store.Load()
require.NoError(t, err)
require.Equal(t, true, data.IsEmpty())
}
func TestStore(t *testing.T) {
fs := getFS(t)
fs.Remove("./fixtures/v5_store.json")
store, err := NewJSON(JSONConfig{
Filesystem: fs,
Filepath: "./fixtures/v5_store.json",
})
require.NoError(t, err)
data, err := store.Load()
require.NoError(t, err)
require.Equal(t, true, data.IsEmpty())
data.Metadata.System["somedata"] = "foobar"
err = store.Store(data)
require.NoError(t, err)
data2, err := store.Load()
require.NoError(t, err)
require.Equal(t, data, data2)
fs.Remove("./fixtures/v5_store.json")
}
func TestInvalidVersion(t *testing.T) {
store, err := NewJSON(JSONConfig{
Filesystem: getFS(t),
Filepath: "./fixtures/v3_empty.json",
})
require.NoError(t, err)
data, err := store.Load()
require.Error(t, err)
require.Equal(t, true, data.IsEmpty())
}

View File

@@ -1,9 +1,42 @@
package store
import "github.com/datarhei/core/v16/restream/app"
type Process struct {
Process *app.Process
Metadata map[string]interface{}
}
type Data struct {
Process map[string]map[string]Process
Metadata map[string]interface{}
}
func (d *Data) IsEmpty() bool {
if len(d.Process) != 0 {
return false
}
if len(d.Metadata) != 0 {
return false
}
return true
}
type Store interface {
// Load data from the store
Load() (StoreData, error)
Load() (Data, error)
// Save data to the store
Store(data StoreData) error
Store(Data) error
}
func NewData() Data {
c := Data{
Process: make(map[string]map[string]Process),
Metadata: make(map[string]interface{}),
}
return c
}