Merge branch 'dev' into cluster

This commit is contained in:
Ingo Oppermann
2023-05-03 11:17:54 +02:00
18 changed files with 196 additions and 52 deletions

View File

@@ -1,3 +1,3 @@
# CORE NVIDIA CUDA BUNDLE # CORE NVIDIA CUDA BUNDLE
FFMPEG_VERSION=5.1.2 FFMPEG_VERSION=5.1.3
CUDA_VERSION=11.7.1 CUDA_VERSION=11.7.1

View File

@@ -1,2 +1,2 @@
# CORE BUNDLE # CORE BUNDLE
FFMPEG_VERSION=5.1.2 FFMPEG_VERSION=5.1.3

View File

@@ -1,2 +1,2 @@
# CORE RASPBERRY-PI BUNDLE # CORE RASPBERRY-PI BUNDLE
FFMPEG_VERSION=5.1.2 FFMPEG_VERSION=5.1.3

View File

@@ -1,2 +1,2 @@
# CORE BUNDLE # CORE BUNDLE
FFMPEG_VERSION=5.1.2 FFMPEG_VERSION=5.1.3

View File

@@ -2,10 +2,18 @@
### Core v16.12.0 > v16.?.? ### Core v16.12.0 > v16.?.?
- Add updated_at field in process infos
- Add preserve process log history when updating a process
- Add support for input framerate data from jsonstats patch
- Add number of keyframes and extradata size to process progress data
- Mod bumps FFmpeg to v5.1.3 (datarhei/core:tag bundles)
- Fix better naming for storage endpoint documentation - Fix better naming for storage endpoint documentation
- Fix freeing up S3 mounts - Fix freeing up S3 mounts
- Fix URL validation if the path contains FFmpeg specific placeholders - Fix URL validation if the path contains FFmpeg specific placeholders
- Fix purging default file from HTTP cache - Fix purging default file from HTTP cache
- Fix parsing S3 storage definition from environment variable
- Fix checking length of CPU time array ([#10](https://github.com/datarhei/core/issues/10))
- Deprecate ENV names that do not correspond to JSON name
### Core v16.11.0 > v16.12.0 ### Core v16.11.0 > v16.12.0

View File

@@ -15,11 +15,11 @@ init:
## build: Build core (default) ## build: Build core (default)
build: build:
CGO_ENABLED=${CGO_ENABLED} GOOS=${GOOS} GOARCH=${GOARCH} go build -o core${BINSUFFIX} CGO_ENABLED=${CGO_ENABLED} GOOS=${GOOS} GOARCH=${GOARCH} go build -o core${BINSUFFIX} -trimpath
# github workflow workaround # github workflow workaround
build_linux: build_linux:
CGO_ENABLED=0 GOOS=linux GOARCH=${OSARCH} go build -o core CGO_ENABLED=0 GOOS=linux GOARCH=${OSARCH} go build -o core -trimpath
## swagger: Update swagger API documentation (requires github.com/swaggo/swag) ## swagger: Update swagger API documentation (requires github.com/swaggo/swag)
swagger: swagger:
@@ -69,19 +69,19 @@ lint:
## import: Build import binary ## import: Build import binary
import: import:
cd app/import && CGO_ENABLED=${CGO_ENABLED} GOOS=${GOOS} GOARCH=${GOARCH} go build -o ../../import -ldflags="-s -w" cd app/import && CGO_ENABLED=${CGO_ENABLED} GOOS=${GOOS} GOARCH=${GOARCH} go build -o ../../import -trimpath -ldflags="-s -w"
# github workflow workaround # github workflow workaround
import_linux: import_linux:
cd app/import && CGO_ENABLED=0 GOOS=linux GOARCH=${OSARCH} go build -o ../../import -ldflags="-s -w" cd app/import && CGO_ENABLED=0 GOOS=linux GOARCH=${OSARCH} go build -o ../../import -trimpath -ldflags="-s -w"
## ffmigrate: Build ffmpeg migration binary ## ffmigrate: Build ffmpeg migration binary
ffmigrate: ffmigrate:
cd app/ffmigrate && CGO_ENABLED=${CGO_ENABLED} GOOS=${GOOS} GOARCH=${GOARCH} go build -o ../../ffmigrate -ldflags="-s -w" cd app/ffmigrate && CGO_ENABLED=${CGO_ENABLED} GOOS=${GOOS} GOARCH=${GOARCH} go build -o ../../ffmigrate -trimpath -ldflags="-s -w"
# github workflow workaround # github workflow workaround
ffmigrate_linux: ffmigrate_linux:
cd app/ffmigrate && CGO_ENABLED=0 GOOS=linux GOARCH=${OSARCH} go build -o ../../ffmigrate -ldflags="-s -w" cd app/ffmigrate && CGO_ENABLED=0 GOOS=linux GOARCH=${OSARCH} go build -o ../../ffmigrate -trimpath -ldflags="-s -w"
## coverage: Generate code coverage analysis ## coverage: Generate code coverage analysis
coverage: coverage:
@@ -94,11 +94,11 @@ commit: vet fmt lint test build
## release: Build a release binary of core ## release: Build a release binary of core
release: release:
CGO_ENABLED=${CGO_ENABLED} GOOS=${GOOS} GOARCH=${GOARCH} go build -o core -ldflags="-s -w -X github.com/datarhei/core/v16/app.Commit=$(COMMIT) -X github.com/datarhei/core/v16/app.Branch=$(BRANCH) -X github.com/datarhei/core/v16/app.Build=$(BUILD)" CGO_ENABLED=${CGO_ENABLED} GOOS=${GOOS} GOARCH=${GOARCH} go build -o core -trimpath -ldflags="-s -w -X github.com/datarhei/core/v16/app.Commit=$(COMMIT) -X github.com/datarhei/core/v16/app.Branch=$(BRANCH) -X github.com/datarhei/core/v16/app.Build=$(BUILD)"
# github workflow workaround # github workflow workaround
release_linux: release_linux:
CGO_ENABLED=0 GOOS=linux GOARCH=${OSARCH} go build -o core -ldflags="-s -w -X github.com/datarhei/core/v16/app.Commit=$(COMMIT) -X github.com/datarhei/core/v16/app.Branch=$(BRANCH) -X github.com/datarhei/core/v16/app.Build=$(BUILD)" CGO_ENABLED=0 GOOS=linux GOARCH=${OSARCH} go build -o core -trimpath -ldflags="-s -w -X github.com/datarhei/core/v16/app.Commit=$(COMMIT) -X github.com/datarhei/core/v16/app.Branch=$(BRANCH) -X github.com/datarhei/core/v16/app.Build=$(BUILD)"
## docker: Build standard Docker image ## docker: Build standard Docker image
docker: docker:

View File

@@ -32,6 +32,9 @@ type ProcessConfig struct {
Reconnect bool Reconnect bool
ReconnectDelay time.Duration ReconnectDelay time.Duration
StaleTimeout time.Duration StaleTimeout time.Duration
LimitCPU float64
LimitMemory uint64
LimitDuration time.Duration
Command []string Command []string
Parser process.Parser Parser process.Parser
Logger log.Logger Logger log.Logger
@@ -117,6 +120,9 @@ func (f *ffmpeg) New(config ProcessConfig) (process.Process, error) {
Reconnect: config.Reconnect, Reconnect: config.Reconnect,
ReconnectDelay: config.ReconnectDelay, ReconnectDelay: config.ReconnectDelay,
StaleTimeout: config.StaleTimeout, StaleTimeout: config.StaleTimeout,
LimitCPU: config.LimitCPU,
LimitMemory: config.LimitMemory,
LimitDuration: config.LimitDuration,
Parser: config.Parser, Parser: config.Parser,
Logger: config.Logger, Logger: config.Logger,
OnStart: config.OnStart, OnStart: config.OnStart,

View File

@@ -33,6 +33,9 @@ type Parser interface {
// ReportHistory returns an array of previews logs // ReportHistory returns an array of previews logs
ReportHistory() []Report ReportHistory() []Report
// TransferReportHistory transfers the report history to another parser
TransferReportHistory(Parser) error
} }
// Config is the config for the Parser implementation // Config is the config for the Parser implementation
@@ -767,3 +770,21 @@ func (p *parser) ReportHistory() []Report {
return history return history
} }
func (p *parser) TransferReportHistory(dst Parser) error {
pp, ok := dst.(*parser)
if !ok {
return fmt.Errorf("the target parser is not of the required type")
}
p.logHistory.Do(func(l interface{}) {
if l == nil {
return
}
pp.logHistory.Value = l
pp.logHistory = pp.logHistory.Next()
})
return nil
}

View File

@@ -14,6 +14,7 @@ type Process struct {
Type string `json:"type" jsonschema:"enum=ffmpeg"` Type string `json:"type" jsonschema:"enum=ffmpeg"`
Reference string `json:"reference"` Reference string `json:"reference"`
CreatedAt int64 `json:"created_at" jsonschema:"minimum=0" format:"int64"` CreatedAt int64 `json:"created_at" jsonschema:"minimum=0" format:"int64"`
UpdatedAt int64 `json:"updated_at" jsonschema:"minimum=0" format:"int64"`
Config *ProcessConfig `json:"config,omitempty"` Config *ProcessConfig `json:"config,omitempty"`
State *ProcessState `json:"state,omitempty"` State *ProcessState `json:"state,omitempty"`
Report *ProcessReport `json:"report,omitempty"` Report *ProcessReport `json:"report,omitempty"`

View File

@@ -544,6 +544,7 @@ func (h *RestreamHandler) getProcess(id, filterString string) (api.Process, erro
Reference: process.Reference, Reference: process.Reference,
Type: "ffmpeg", Type: "ffmpeg",
CreatedAt: process.CreatedAt, CreatedAt: process.CreatedAt,
UpdatedAt: process.UpdatedAt,
} }
if wants["config"] { if wants["config"] {

View File

@@ -7,6 +7,7 @@ import (
"io" "io"
"net/http" "net/http"
"net/url" "net/url"
"path"
urlpath "path" urlpath "path"
"path/filepath" "path/filepath"
"regexp" "regexp"
@@ -298,7 +299,7 @@ func (r *bodyReader) Close() error {
func (r *bodyReader) getSegments(dir string) []string { func (r *bodyReader) getSegments(dir string) []string {
segments := []string{} segments := []string{}
// Find all segments URLS in the .m3u8 // Find all segment URLs in the .m3u8
scanner := bufio.NewScanner(&r.buffer) scanner := bufio.NewScanner(&r.buffer)
for scanner.Scan() { for scanner.Scan() {
line := scanner.Text() line := scanner.Text()
@@ -405,9 +406,23 @@ func (g *sessionRewriter) rewriteHLS(sessionID string, requestURL *url.URL) {
q := u.Query() q := u.Query()
loop := false
// If this is a master manifest (i.e. an m3u8 which contains references to other m3u8), then // If this is a master manifest (i.e. an m3u8 which contains references to other m3u8), then
// we give each substream an own session ID if they don't have already. // we give each substream an own session ID if they don't have already.
if strings.HasSuffix(u.Path, ".m3u8") { if strings.HasSuffix(u.Path, ".m3u8") {
// Check if we're referring to ourselves. This will cause an infinite loop
// and has to be stopped.
file := u.Path
if !strings.HasPrefix(file, "/") {
dir := path.Dir(requestURL.Path)
file = filepath.Join(dir, file)
}
if requestURL.Path == file {
loop = true
}
q.Set("session", shortuuid.New()) q.Set("session", shortuuid.New())
isMaster = true isMaster = true
@@ -417,8 +432,12 @@ func (g *sessionRewriter) rewriteHLS(sessionID string, requestURL *url.URL) {
u.RawQuery = q.Encode() u.RawQuery = q.Encode()
if loop {
buffer.WriteString("# m3u8 is referencing itself: " + u.String() + "\n")
} else {
buffer.WriteString(u.String() + "\n") buffer.WriteString(u.String() + "\n")
} }
}
if err := scanner.Err(); err != nil { if err := scanner.Err(); err != nil {
return return

View File

@@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"reflect" "reflect"
"runtime" "runtime"
"runtime/debug"
"strings" "strings"
"time" "time"
) )
@@ -99,6 +100,7 @@ type Logger interface {
type logger struct { type logger struct {
output Writer output Writer
component string component string
modulePath string
} }
// New returns an implementation of the Logger interface. // New returns an implementation of the Logger interface.
@@ -107,6 +109,10 @@ func New(component string) Logger {
component: component, component: component,
} }
if info, ok := debug.ReadBuildInfo(); ok {
l.modulePath = info.Path
}
return l return l
} }
@@ -114,6 +120,7 @@ func (l *logger) clone() *logger {
clone := &logger{ clone := &logger{
output: l.output, output: l.output,
component: l.component, component: l.component,
modulePath: l.modulePath,
} }
return clone return clone
@@ -209,6 +216,7 @@ func (e *Event) WithComponent(component string) Logger {
func (e *Event) Log(format string, args ...interface{}) { func (e *Event) Log(format string, args ...interface{}) {
_, file, line, _ := runtime.Caller(1) _, file, line, _ := runtime.Caller(1)
file = strings.TrimPrefix(file, e.logger.modulePath)
n := e.clone() n := e.clone()

View File

@@ -55,7 +55,7 @@ func TestScheme(t *testing.T) {
require.False(t, r) require.False(t, r)
} }
func TestPars(t *testing.T) { func TestParse(t *testing.T) {
u, err := Parse("http://localhost/foobar") u, err := Parse("http://localhost/foobar")
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, &URL{ require.Equal(t, &URL{

View File

@@ -63,26 +63,19 @@ type Config struct {
// Status represents the current status of a process // Status represents the current status of a process
type Status struct { type Status struct {
// State is the current state of the process. See stateType for the known states. State string // State is the current state of the process. See stateType for the known states.
State string States States // States is the cumulative history of states the process had.
Order string // Order is the wanted condition of process, either "start" or "stop"
// States is the cumulative history of states the process had. Duration time.Duration // Duration is the time since the last change of the state
States States Time time.Time // Time is the time of the last change of the state
CPU struct {
// Order is the wanted condition of process, either "start" or "stop" Current float64 // Used CPU in percent
Order string Limit float64 // Limit in percent
}
// Duration is the time since the last change of the state Memory struct {
Duration time.Duration Current uint64 // Used memory in bytes
Limit uint64 // Limit in bytes
// Time is the time of the last change of the state }
Time time.Time
// Used CPU in percent
CPU float64
// Used memory in bytes
Memory uint64
} }
// States // States
@@ -390,6 +383,7 @@ func (p *process) getStateString() string {
// Status returns the current status of the process // Status returns the current status of the process
func (p *process) Status() Status { func (p *process) Status() Status {
cpu, memory := p.limits.Current() cpu, memory := p.limits.Current()
cpuLimit, memoryLimit := p.limits.Limits()
p.state.lock.Lock() p.state.lock.Lock()
stateTime := p.state.time stateTime := p.state.time
@@ -407,10 +401,14 @@ func (p *process) Status() Status {
Order: order, Order: order,
Duration: time.Since(stateTime), Duration: time.Since(stateTime),
Time: stateTime, Time: stateTime,
CPU: cpu,
Memory: memory,
} }
s.CPU.Current = cpu
s.CPU.Limit = cpuLimit
s.Memory.Current = memory
s.Memory.Limit = memoryLimit
return s return s
} }
@@ -593,6 +591,7 @@ func (p *process) stop(wait bool) error {
if p.callbacks.onExit == nil { if p.callbacks.onExit == nil {
p.callbacks.onExit = func() { p.callbacks.onExit = func() {
wg.Done() wg.Done()
p.callbacks.onExit = nil p.callbacks.onExit = nil
} }
} else { } else {
@@ -600,6 +599,7 @@ func (p *process) stop(wait bool) error {
p.callbacks.onExit = func() { p.callbacks.onExit = func() {
cb() cb()
wg.Done() wg.Done()
p.callbacks.onExit = cb p.callbacks.onExit = cb
} }
} }

View File

@@ -106,6 +106,7 @@ type Process struct {
Reference string `json:"reference"` Reference string `json:"reference"`
Config *Config `json:"config"` Config *Config `json:"config"`
CreatedAt int64 `json:"created_at"` CreatedAt int64 `json:"created_at"`
UpdatedAt int64 `json:"updated_at"`
Order string `json:"order"` Order string `json:"order"`
} }
@@ -115,6 +116,7 @@ func (process *Process) Clone() *Process {
Reference: process.Reference, Reference: process.Reference,
Config: process.Config.Clone(), Config: process.Config.Clone(),
CreatedAt: process.CreatedAt, CreatedAt: process.CreatedAt,
UpdatedAt: process.UpdatedAt,
Order: process.Order, Order: process.Order,
} }

View File

@@ -78,7 +78,7 @@ func TestMaxAge(t *testing.T) {
require.Eventually(t, func() bool { require.Eventually(t, func() bool {
return cleanfs.Files() == 0 return cleanfs.Files() == 0
}, 5*time.Second, time.Second) }, 10*time.Second, time.Second)
cleanfs.WriteFileReader("/chunk_3.ts", strings.NewReader("chunk_3")) cleanfs.WriteFileReader("/chunk_3.ts", strings.NewReader("chunk_3"))
@@ -96,7 +96,7 @@ func TestMaxAge(t *testing.T) {
require.ElementsMatch(t, []string{"/chunk_3.ts"}, names) require.ElementsMatch(t, []string{"/chunk_3.ts"}, names)
return true return true
}, 2*time.Second, time.Second) }, 5*time.Second, time.Second)
cleanfs.Stop() cleanfs.Stop()
} }

View File

@@ -355,6 +355,9 @@ func (r *restream) load() error {
Reconnect: t.config.Reconnect, Reconnect: t.config.Reconnect,
ReconnectDelay: time.Duration(t.config.ReconnectDelay) * time.Second, ReconnectDelay: time.Duration(t.config.ReconnectDelay) * time.Second,
StaleTimeout: time.Duration(t.config.StaleTimeout) * time.Second, StaleTimeout: time.Duration(t.config.StaleTimeout) * time.Second,
LimitCPU: t.config.LimitCPU,
LimitMemory: t.config.LimitMemory,
LimitDuration: time.Duration(t.config.LimitWaitFor) * time.Second,
Command: t.command, Command: t.command,
Parser: t.parser, Parser: t.parser,
Logger: t.logger, Logger: t.logger,
@@ -456,6 +459,8 @@ func (r *restream) createTask(config *app.Config) (*task, error) {
CreatedAt: time.Now().Unix(), CreatedAt: time.Now().Unix(),
} }
process.UpdatedAt = process.CreatedAt
if config.Autostart { if config.Autostart {
process.Order = "start" process.Order = "start"
} }
@@ -492,6 +497,9 @@ func (r *restream) createTask(config *app.Config) (*task, error) {
Reconnect: t.config.Reconnect, Reconnect: t.config.Reconnect,
ReconnectDelay: time.Duration(t.config.ReconnectDelay) * time.Second, ReconnectDelay: time.Duration(t.config.ReconnectDelay) * time.Second,
StaleTimeout: time.Duration(t.config.StaleTimeout) * time.Second, StaleTimeout: time.Duration(t.config.StaleTimeout) * time.Second,
LimitCPU: t.config.LimitCPU,
LimitMemory: t.config.LimitMemory,
LimitDuration: time.Duration(t.config.LimitWaitFor) * time.Second,
Command: t.command, Command: t.command,
Parser: t.parser, Parser: t.parser,
Logger: t.logger, Logger: t.logger,
@@ -867,6 +875,10 @@ func (r *restream) UpdateProcess(id string, config *app.Config) error {
return ErrUnknownProcess return ErrUnknownProcess
} }
// This would require a major version jump
//t.process.CreatedAt = task.process.CreatedAt
t.process.UpdatedAt = time.Now().Unix()
task.parser.TransferReportHistory(t.parser)
t.process.Order = task.process.Order t.process.Order = task.process.Order
if id != t.id { if id != t.id {
@@ -1173,6 +1185,9 @@ func (r *restream) reloadProcess(id string) error {
Reconnect: t.config.Reconnect, Reconnect: t.config.Reconnect,
ReconnectDelay: time.Duration(t.config.ReconnectDelay) * time.Second, ReconnectDelay: time.Duration(t.config.ReconnectDelay) * time.Second,
StaleTimeout: time.Duration(t.config.StaleTimeout) * time.Second, StaleTimeout: time.Duration(t.config.StaleTimeout) * time.Second,
LimitCPU: t.config.LimitCPU,
LimitMemory: t.config.LimitMemory,
LimitDuration: time.Duration(t.config.LimitWaitFor) * time.Second,
Command: t.command, Command: t.command,
Parser: t.parser, Parser: t.parser,
Logger: t.logger, Logger: t.logger,
@@ -1212,8 +1227,8 @@ func (r *restream) GetProcessState(id string) (*app.State, error) {
state.State = status.State state.State = status.State
state.States.Marshal(status.States) state.States.Marshal(status.States)
state.Time = status.Time.Unix() state.Time = status.Time.Unix()
state.Memory = status.Memory state.Memory = status.Memory.Current
state.CPU = status.CPU state.CPU = status.CPU.Current
state.Duration = status.Duration.Round(10 * time.Millisecond).Seconds() state.Duration = status.Duration.Round(10 * time.Millisecond).Seconds()
state.Reconnect = -1 state.Reconnect = -1
state.Command = make([]string, len(task.command)) state.Command = make([]string, len(task.command))

View File

@@ -22,6 +22,7 @@ func getDummyRestreamer(portrange net.Portranger, validatorIn, validatorOut ffmp
ffmpeg, err := ffmpeg.New(ffmpeg.Config{ ffmpeg, err := ffmpeg.New(ffmpeg.Config{
Binary: binary, Binary: binary,
LogHistoryLength: 3,
Portrange: portrange, Portrange: portrange,
ValidatorInput: validatorIn, ValidatorInput: validatorIn,
ValidatorOutput: validatorOut, ValidatorOutput: validatorOut,
@@ -215,6 +216,14 @@ func TestUpdateProcess(t *testing.T) {
err = rs.AddProcess(process2) err = rs.AddProcess(process2)
require.Equal(t, nil, err) require.Equal(t, nil, err)
process, err := rs.GetProcess(process2.ID)
require.NoError(t, err)
createdAt := process.CreatedAt
updatedAt := process.UpdatedAt
time.Sleep(2 * time.Second)
process3 := getDummyProcess() process3 := getDummyProcess()
require.NotNil(t, process3) require.NotNil(t, process3)
process3.ID = "process2" process3.ID = "process2"
@@ -229,8 +238,11 @@ func TestUpdateProcess(t *testing.T) {
_, err = rs.GetProcess(process1.ID) _, err = rs.GetProcess(process1.ID)
require.Error(t, err) require.Error(t, err)
_, err = rs.GetProcess(process3.ID) process, err = rs.GetProcess(process3.ID)
require.NoError(t, err) require.NoError(t, err)
require.NotEqual(t, createdAt, process.CreatedAt) // this should be equal, but will require a major version jump
require.NotEqual(t, updatedAt, process.UpdatedAt)
} }
func TestGetProcess(t *testing.T) { func TestGetProcess(t *testing.T) {
@@ -437,10 +449,10 @@ func TestLog(t *testing.T) {
rs.AddProcess(process) rs.AddProcess(process)
_, err = rs.GetProcessLog("foobar") _, err = rs.GetProcessLog("foobar")
require.NotEqual(t, nil, err, "shouldn't be able to get log from non-existing process") require.Error(t, err)
log, err := rs.GetProcessLog(process.ID) log, err := rs.GetProcessLog(process.ID)
require.Equal(t, nil, err, "should be able to get log from existing process") require.NoError(t, err)
require.Equal(t, 0, len(log.Prelude)) require.Equal(t, 0, len(log.Prelude))
require.Equal(t, 0, len(log.Log)) require.Equal(t, 0, len(log.Log))
@@ -461,6 +473,34 @@ func TestLog(t *testing.T) {
require.NotEqual(t, 0, len(log.Log)) require.NotEqual(t, 0, len(log.Log))
} }
func TestLogTransfer(t *testing.T) {
rs, err := getDummyRestreamer(nil, nil, nil, nil)
require.NoError(t, err)
process := getDummyProcess()
err = rs.AddProcess(process)
require.NoError(t, err)
rs.StartProcess(process.ID)
time.Sleep(3 * time.Second)
rs.StopProcess(process.ID)
rs.StartProcess(process.ID)
rs.StopProcess(process.ID)
log, _ := rs.GetProcessLog(process.ID)
require.Equal(t, 1, len(log.History))
err = rs.UpdateProcess(process.ID, process)
require.NoError(t, err)
log, _ = rs.GetProcessLog(process.ID)
require.Equal(t, 1, len(log.History))
}
func TestPlayoutNoRange(t *testing.T) { func TestPlayoutNoRange(t *testing.T) {
rs, err := getDummyRestreamer(nil, nil, nil, nil) rs, err := getDummyRestreamer(nil, nil, nil, nil)
require.NoError(t, err) require.NoError(t, err)
@@ -843,3 +883,26 @@ func TestReplacer(t *testing.T) {
require.Equal(t, process, rs.tasks["314159265359"].config) require.Equal(t, process, rs.tasks["314159265359"].config)
} }
func TestProcessLimit(t *testing.T) {
rsi, err := getDummyRestreamer(nil, nil, nil, nil)
require.NoError(t, err)
process := getDummyProcess()
process.LimitCPU = 61
process.LimitMemory = 42
process.Autostart = false
err = rsi.AddProcess(process)
require.NoError(t, err)
rs := rsi.(*restream)
task, ok := rs.tasks[process.ID]
require.True(t, ok)
status := task.ffmpeg.Status()
require.Equal(t, float64(61), status.CPU.Limit)
require.Equal(t, uint64(42), status.Memory.Limit)
}