mirror of
https://github.com/datarhei/core.git
synced 2025-09-26 20:11:29 +08:00
Compare commits
8 Commits
0e38648b70
...
5e7c8ea469
Author | SHA1 | Date | |
---|---|---|---|
![]() |
5e7c8ea469 | ||
![]() |
981fcd4dd3 | ||
![]() |
91fbba3f4b | ||
![]() |
f827ca9dd1 | ||
![]() |
96f1e81fed | ||
![]() |
17006edeb6 | ||
![]() |
d64c7b377e | ||
![]() |
fbddaf3de7 |
21
Dockerfile.ffmpeg
Normal file
21
Dockerfile.ffmpeg
Normal file
@@ -0,0 +1,21 @@
|
||||
ARG GOLANG_IMAGE=golang:1.24-alpine3.21
|
||||
ARG BUILD_IMAGE=alpine:3.21
|
||||
|
||||
# Cross-Compilation
|
||||
# https://www.docker.com/blog/faster-multi-platform-builds-dockerfile-cross-compilation-guide/
|
||||
FROM --platform=$BUILDPLATFORM $GOLANG_IMAGE AS builder
|
||||
|
||||
ARG TARGETOS TARGETARCH TARGETVARIANT
|
||||
ENV GOOS=$TARGETOS GOARCH=$TARGETARCH GOARM=$TARGETVARIANT
|
||||
|
||||
COPY . /dist/core
|
||||
|
||||
RUN cd /dist/core/internal/testhelper/ffmpeg && \
|
||||
go build
|
||||
|
||||
FROM $BUILD_IMAGE
|
||||
|
||||
COPY --from=builder /dist/core/internal/testhelper/ffmpeg/ffmpeg /usr/bin/ffmpeg
|
||||
|
||||
ENTRYPOINT ["/usr/local/bin/ffmpeg"]
|
||||
WORKDIR /usr/local/bin
|
@@ -618,7 +618,7 @@ func (c *cluster) applyOp(op interface{}, logger log.Logger) processOpError {
|
||||
"nodeid": v.nodeid,
|
||||
}).Log("Updating process")
|
||||
case processOpDelete:
|
||||
err := c.manager.ProcessDelete(v.nodeid, v.processid)
|
||||
err := c.manager.ProcessDelete(v.nodeid, v.processid, true)
|
||||
if err != nil {
|
||||
opErr = processOpError{
|
||||
processid: v.processid,
|
||||
@@ -701,7 +701,7 @@ func (c *cluster) applyOp(op interface{}, logger log.Logger) processOpError {
|
||||
}
|
||||
}
|
||||
|
||||
err = c.manager.ProcessDelete(v.fromNodeid, v.config.ProcessID())
|
||||
err = c.manager.ProcessDelete(v.fromNodeid, v.config.ProcessID(), false)
|
||||
if err != nil {
|
||||
//opErr = processOpError{
|
||||
// processid: v.config.ProcessID(),
|
||||
|
@@ -166,7 +166,7 @@ func (n *Core) connect() error {
|
||||
Transport: tr,
|
||||
Timeout: 0,
|
||||
},
|
||||
Timeout: 5 * time.Second,
|
||||
Timeout: 10 * time.Second,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("creating client failed (%s): %w", address, err)
|
||||
@@ -337,7 +337,7 @@ func (n *Core) ProcessCommand(id app.ProcessID, command string) error {
|
||||
return client.ProcessCommand(id, command)
|
||||
}
|
||||
|
||||
func (n *Core) ProcessDelete(id app.ProcessID) error {
|
||||
func (n *Core) ProcessDelete(id app.ProcessID, purge bool) error {
|
||||
n.lock.RLock()
|
||||
client := n.client
|
||||
n.lock.RUnlock()
|
||||
@@ -346,7 +346,7 @@ func (n *Core) ProcessDelete(id app.ProcessID) error {
|
||||
return ErrNoPeer
|
||||
}
|
||||
|
||||
return client.ProcessDelete(id)
|
||||
return client.ProcessDelete(id, purge)
|
||||
}
|
||||
|
||||
func (n *Core) ProcessUpdate(id app.ProcessID, config *app.Config, metadata map[string]any, force bool) error {
|
||||
|
@@ -297,10 +297,16 @@ func (p *Manager) FilesystemGetFileInfo(prefix, path string) (int64, time.Time,
|
||||
return size, lastModified, nil
|
||||
}
|
||||
|
||||
type mediaInfo struct {
|
||||
nodeid string
|
||||
lastModified time.Time
|
||||
}
|
||||
|
||||
func (p *Manager) getNodeIDForMedia(prefix, path string) (string, error) {
|
||||
// this is only for mem and disk prefixes
|
||||
nodesChan := make(chan string, 16)
|
||||
nodeids := []string{}
|
||||
mediaChan := make(chan mediaInfo, 16)
|
||||
nodeid := ""
|
||||
lastModified := time.Time{}
|
||||
|
||||
wgList := sync.WaitGroup{}
|
||||
wgList.Add(1)
|
||||
@@ -308,12 +314,17 @@ func (p *Manager) getNodeIDForMedia(prefix, path string) (string, error) {
|
||||
go func() {
|
||||
defer wgList.Done()
|
||||
|
||||
for nodeid := range nodesChan {
|
||||
if len(nodeid) == 0 {
|
||||
for media := range mediaChan {
|
||||
if len(media.nodeid) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
nodeids = append(nodeids, nodeid)
|
||||
if !media.lastModified.After(lastModified) {
|
||||
continue
|
||||
}
|
||||
|
||||
nodeid = media.nodeid
|
||||
lastModified = media.lastModified
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -323,30 +334,33 @@ func (p *Manager) getNodeIDForMedia(prefix, path string) (string, error) {
|
||||
for id, n := range p.nodes {
|
||||
wg.Add(1)
|
||||
|
||||
go func(nodeid string, node *Node, p chan<- string) {
|
||||
go func(nodeid string, node *Node, p chan<- mediaInfo) {
|
||||
defer wg.Done()
|
||||
|
||||
_, _, err := node.Core().MediaGetInfo(prefix, path)
|
||||
_, lastModified, err := node.Core().MediaGetInfo(prefix, path)
|
||||
if err != nil {
|
||||
nodeid = ""
|
||||
}
|
||||
|
||||
p <- nodeid
|
||||
}(id, n, nodesChan)
|
||||
p <- mediaInfo{
|
||||
nodeid: nodeid,
|
||||
lastModified: lastModified,
|
||||
}
|
||||
}(id, n, mediaChan)
|
||||
}
|
||||
p.lock.RUnlock()
|
||||
|
||||
wg.Wait()
|
||||
|
||||
close(nodesChan)
|
||||
close(mediaChan)
|
||||
|
||||
wgList.Wait()
|
||||
|
||||
if len(nodeids) == 0 {
|
||||
if len(nodeid) == 0 {
|
||||
return "", fmt.Errorf("file not found")
|
||||
}
|
||||
|
||||
return nodeids[0], nil
|
||||
return nodeid, nil
|
||||
}
|
||||
|
||||
func (p *Manager) getNodeForMedia(prefix, path string) (*Node, error) {
|
||||
@@ -363,7 +377,7 @@ func (p *Manager) getNodeForMedia(prefix, path string) (*Node, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
p.cache.Put(prefix+":"+path, id, 5*time.Second)
|
||||
p.cache.Put(prefix+":"+path, id, 2*time.Second)
|
||||
|
||||
return p.NodeGet(id)
|
||||
}
|
||||
@@ -572,13 +586,13 @@ func (p *Manager) ProcessAdd(nodeid string, config *app.Config, metadata map[str
|
||||
return node.Core().ProcessAdd(config, metadata)
|
||||
}
|
||||
|
||||
func (p *Manager) ProcessDelete(nodeid string, id app.ProcessID) error {
|
||||
func (p *Manager) ProcessDelete(nodeid string, id app.ProcessID, purge bool) error {
|
||||
node, err := p.NodeGet(nodeid)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return node.Core().ProcessDelete(id)
|
||||
return node.Core().ProcessDelete(id, purge)
|
||||
}
|
||||
|
||||
func (p *Manager) ProcessUpdate(nodeid string, id app.ProcessID, config *app.Config, metadata map[string]any, force bool) error {
|
||||
|
@@ -42,7 +42,7 @@ func TestNew(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, Skills{
|
||||
FFmpeg: ffmpeg{
|
||||
Version: "4.4.1",
|
||||
Version: "7.1.1",
|
||||
Compiler: "gcc 10.3.1 (Alpine 10.3.1_git20211027) 20211027",
|
||||
Configuration: "--extra-version=datarhei --prefix=/usr --extra-libs='-lpthread -lm -lz -lsupc++ -lstdc++ -lssl -lcrypto -lz -lc -ldl' --enable-nonfree --enable-gpl --enable-version3 --enable-postproc --enable-static --enable-openssl --enable-omx --enable-omx-rpi --enable-mmal --enable-v4l2_m2m --enable-libfreetype --enable-libsrt --enable-libx264 --enable-libx265 --enable-libvpx --enable-libmp3lame --enable-libopus --enable-libvorbis --disable-ffplay --disable-debug --disable-doc --disable-shared",
|
||||
Libraries: []Library{
|
||||
|
@@ -63,7 +63,7 @@ type RestClient interface {
|
||||
ProcessAdd(p *app.Config, metadata map[string]any) error // POST /v3/process
|
||||
Process(id app.ProcessID, filter []string) (api.Process, error) // GET /v3/process/{id}
|
||||
ProcessUpdate(id app.ProcessID, p *app.Config, metadata map[string]any, force bool) error // PUT /v3/process/{id}
|
||||
ProcessDelete(id app.ProcessID) error // DELETE /v3/process/{id}
|
||||
ProcessDelete(id app.ProcessID, purge bool) error // DELETE /v3/process/{id}
|
||||
ProcessCommand(id app.ProcessID, command string) error // PUT /v3/process/{id}/command
|
||||
ProcessProbe(id app.ProcessID) (api.Probe, error) // GET /v3/process/{id}/probe
|
||||
ProcessProbeConfig(config *app.Config) (api.Probe, error) // POST /v3/process/probe
|
||||
|
@@ -2,6 +2,7 @@ package client
|
||||
|
||||
import (
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/datarhei/core/v16/encoding/json"
|
||||
@@ -127,9 +128,10 @@ func (r *restclient) ProcessReportSet(id app.ProcessID, report *app.Report) erro
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *restclient) ProcessDelete(id app.ProcessID) error {
|
||||
func (r *restclient) ProcessDelete(id app.ProcessID, purge bool) error {
|
||||
query := &url.Values{}
|
||||
query.Set("domain", id.Domain)
|
||||
query.Set("purge", strconv.FormatBool(purge))
|
||||
|
||||
_, err := r.call("DELETE", "/v3/process/"+url.PathEscape(id.ID), query, nil, "", nil)
|
||||
|
||||
|
@@ -37,9 +37,9 @@ func NewClusterFS(name string, fs fs.Filesystem, proxy *node.Manager) Filesystem
|
||||
|
||||
func (fs *filesystem) Open(path string) fs.File {
|
||||
// Check if the file is locally available
|
||||
if file := fs.Filesystem.Open(path); file != nil {
|
||||
return file
|
||||
}
|
||||
//if file := fs.Filesystem.Open(path); file != nil {
|
||||
// return file
|
||||
//}
|
||||
|
||||
// Check if the file is available in the cluster
|
||||
size, lastModified, err := fs.proxy.FilesystemGetFileInfo(fs.name, path)
|
||||
@@ -53,7 +53,7 @@ func (fs *filesystem) Open(path string) fs.File {
|
||||
},
|
||||
name: path,
|
||||
size: size,
|
||||
lastModiefied: lastModified,
|
||||
lastModified: lastModified,
|
||||
}
|
||||
|
||||
return file
|
||||
@@ -65,7 +65,7 @@ type file struct {
|
||||
getFile func(offset int64) (io.ReadCloser, error)
|
||||
name string
|
||||
size int64
|
||||
lastModiefied time.Time
|
||||
lastModified time.Time
|
||||
}
|
||||
|
||||
func (f *file) Read(p []byte) (int, error) {
|
||||
@@ -128,7 +128,7 @@ func (f *file) Size() int64 {
|
||||
}
|
||||
|
||||
func (f *file) ModTime() time.Time {
|
||||
return f.lastModiefied
|
||||
return f.lastModified
|
||||
}
|
||||
|
||||
func (f *file) IsLink() (string, bool) {
|
||||
|
@@ -61,7 +61,17 @@ func (h *ClusterHandler) ProcessList(c echo.Context) error {
|
||||
continue
|
||||
}
|
||||
|
||||
pmap[app.NewProcessID(p.ID, p.Domain)] = p
|
||||
pid := app.NewProcessID(p.ID, p.Domain)
|
||||
proc, ok := pmap[pid]
|
||||
if ok {
|
||||
// While moving a process from one node to another, they exist on both nodes.
|
||||
// This will select the latest incarnation of this process.
|
||||
if p.CreatedAt < proc.CreatedAt {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
pmap[pid] = p
|
||||
}
|
||||
|
||||
missing := []api.Process{}
|
||||
@@ -76,6 +86,14 @@ func (h *ClusterHandler) ProcessList(c echo.Context) error {
|
||||
continue
|
||||
}
|
||||
|
||||
// Prevent overshadowing existing processes with undeployed changes.
|
||||
// TODO: should undeployed changes be visible in a process config/state? They
|
||||
// are visible in the store process list, however.
|
||||
if _, ok := pmap[p.Config.ProcessID()]; ok {
|
||||
continue
|
||||
}
|
||||
|
||||
/*
|
||||
// Check if the process has been deployed
|
||||
if len(p.Error) == 0 {
|
||||
if _, ok := pmap[p.Config.ProcessID()]; ok {
|
||||
@@ -84,7 +102,7 @@ func (h *ClusterHandler) ProcessList(c echo.Context) error {
|
||||
} else {
|
||||
delete(pmap, p.Config.ProcessID())
|
||||
}
|
||||
|
||||
*/
|
||||
process := api.Process{}
|
||||
process.UnmarshalStore(p, filter.config, filter.state, filter.report, filter.metadata)
|
||||
|
||||
|
@@ -233,6 +233,7 @@ func (h *ProcessHandler) Get(c echo.Context) error {
|
||||
// @Produce json
|
||||
// @Param id path string true "Process ID"
|
||||
// @Param domain query string false "Process domain"
|
||||
// @Param purge query string false "Whether to purge files"
|
||||
// @Success 200 {string} string
|
||||
// @Failure 400 {object} api.Error
|
||||
// @Failure 403 {object} api.Error
|
||||
@@ -244,6 +245,12 @@ func (h *ProcessHandler) Delete(c echo.Context) error {
|
||||
ctxuser := util.DefaultContext(c, "user", "")
|
||||
id := util.PathParam(c, "id")
|
||||
domain := util.DefaultQuery(c, "domain", "")
|
||||
purgeq := util.DefaultQuery(c, "purge", "true")
|
||||
|
||||
purge := false
|
||||
if x, err := strconv.ParseBool(purgeq); err == nil {
|
||||
purge = x
|
||||
}
|
||||
|
||||
tid := app.ProcessID{
|
||||
ID: id,
|
||||
@@ -258,7 +265,7 @@ func (h *ProcessHandler) Delete(c echo.Context) error {
|
||||
return h.apiErrorFromError(err)
|
||||
}
|
||||
|
||||
if err := h.restream.DeleteProcess(tid); err != nil {
|
||||
if err := h.restream.DeleteProcess(tid, purge); err != nil {
|
||||
return h.apiErrorFromError(err)
|
||||
}
|
||||
|
||||
|
@@ -5,13 +5,15 @@ import (
|
||||
"fmt"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strconv"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/datarhei/core/v16/slices"
|
||||
)
|
||||
|
||||
func main() {
|
||||
header := `ffmpeg version 4.4.1-datarhei Copyright (c) 2000-2021 the FFmpeg developers
|
||||
header := `ffmpeg version 7.1.1-datarhei Copyright (c) 2000-2021 the FFmpeg developers
|
||||
built with gcc 10.3.1 (Alpine 10.3.1_git20211027) 20211027
|
||||
configuration: --extra-version=datarhei --prefix=/usr --extra-libs='-lpthread -lm -lz -lsupc++ -lstdc++ -lssl -lcrypto -lz -lc -ldl' --enable-nonfree --enable-gpl --enable-version3 --enable-postproc --enable-static --enable-openssl --enable-omx --enable-omx-rpi --enable-mmal --enable-v4l2_m2m --enable-libfreetype --enable-libsrt --enable-libx264 --enable-libx265 --enable-libvpx --enable-libmp3lame --enable-libopus --enable-libvorbis --disable-ffplay --disable-debug --disable-doc --disable-shared
|
||||
libavutil 56. 70.100 / 56. 70.100
|
||||
@@ -150,6 +152,10 @@ Output #0, hls, to './data/testsrc.m3u8':
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
if lastArg == "-buildconf" {
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
switch lastArg {
|
||||
case "-codecs":
|
||||
fmt.Fprintf(os.Stderr, "%s\n", codecs)
|
||||
@@ -175,6 +181,33 @@ Output #0, hls, to './data/testsrc.m3u8':
|
||||
os.Exit(2)
|
||||
}
|
||||
|
||||
quit := make(chan os.Signal, 1)
|
||||
signal.Notify(quit, os.Interrupt, syscall.SIGTERM)
|
||||
|
||||
for i, arg := range os.Args {
|
||||
if arg != "-startdelay" {
|
||||
continue
|
||||
}
|
||||
|
||||
if x, err := strconv.ParseUint(os.Args[i+1], 10, 32); err == nil {
|
||||
time.Sleep(time.Duration(x) * time.Second)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
stopDelay := time.Duration(0)
|
||||
|
||||
for i, arg := range os.Args {
|
||||
if arg != "-stopdelay" {
|
||||
continue
|
||||
}
|
||||
|
||||
if x, err := strconv.ParseUint(os.Args[i+1], 10, 32); err == nil {
|
||||
stopDelay = time.Duration(x) * time.Second
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
fmt.Fprintf(os.Stderr, "%s\n", prelude)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
@@ -197,12 +230,14 @@ Output #0, hls, to './data/testsrc.m3u8':
|
||||
}(ctx)
|
||||
|
||||
// Wait for interrupt signal to gracefully shutdown the app
|
||||
quit := make(chan os.Signal, 1)
|
||||
signal.Notify(quit, os.Interrupt)
|
||||
<-quit
|
||||
|
||||
cancel()
|
||||
|
||||
if stopDelay > 0 {
|
||||
time.Sleep(stopDelay)
|
||||
}
|
||||
|
||||
fmt.Fprintf(os.Stderr, "\nExiting normally, received signal 2.\n")
|
||||
|
||||
os.Exit(255)
|
||||
|
@@ -808,7 +808,7 @@ func (p *process) stop(wait bool, reason string) error {
|
||||
// If the process in starting state, wait until the process has been started
|
||||
for {
|
||||
if p.getState() == stateStarting {
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
|
||||
|
@@ -42,7 +42,7 @@ type Restreamer interface {
|
||||
|
||||
AddProcess(config *app.Config) error // Add a new process
|
||||
GetProcessIDs(idpattern, refpattern, ownerpattern, domainpattern string) []app.ProcessID // Get a list of process IDs based on patterns for ID and reference
|
||||
DeleteProcess(id app.ProcessID) error // Delete a process
|
||||
DeleteProcess(id app.ProcessID, purge bool) error // Delete a process
|
||||
UpdateProcess(id app.ProcessID, config *app.Config, force bool) error // Update a process
|
||||
StartProcess(id app.ProcessID) error // Start a process
|
||||
StopProcess(id app.ProcessID) error // Stop a process
|
||||
@@ -227,7 +227,7 @@ func (r *restream) Stop() {
|
||||
wg.Wait()
|
||||
|
||||
r.tasks.Range(func(id app.ProcessID, t *task) bool {
|
||||
r.unsetCleanup(id)
|
||||
r.unsetCleanup(id, false)
|
||||
return true
|
||||
})
|
||||
|
||||
@@ -743,16 +743,16 @@ func (r *restream) setCleanup(id app.ProcessID, config *app.Config) {
|
||||
continue
|
||||
}
|
||||
|
||||
fs.UpdateCleanup(id.String(), p)
|
||||
fs.UpdateCleanup(id.String(), p, true)
|
||||
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *restream) unsetCleanup(id app.ProcessID) {
|
||||
func (r *restream) unsetCleanup(id app.ProcessID, purge bool) {
|
||||
for _, fs := range r.fs.list {
|
||||
fs.UpdateCleanup(id.String(), nil)
|
||||
fs.UpdateCleanup(id.String(), nil, purge)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1167,7 +1167,6 @@ func (r *restream) UpdateProcess(id app.ProcessID, config *app.Config, force boo
|
||||
if !ok {
|
||||
return ErrUnknownProcess
|
||||
}
|
||||
|
||||
defer r.tasks.Unlock(id)
|
||||
|
||||
err := r.updateProcess(task, config, force)
|
||||
@@ -1233,7 +1232,7 @@ func (r *restream) updateProcess(task *task, config *app.Config, force bool) err
|
||||
t.Restore()
|
||||
|
||||
if !tid.Equal(task.ID()) {
|
||||
r.unsetCleanup(task.ID())
|
||||
r.unsetCleanup(task.ID(), true)
|
||||
r.tasks.LoadAndDelete(task.ID())
|
||||
}
|
||||
|
||||
@@ -1300,15 +1299,14 @@ func (r *restream) GetProcess(id app.ProcessID) (*app.Process, error) {
|
||||
return task.Process(), nil
|
||||
}
|
||||
|
||||
func (r *restream) DeleteProcess(id app.ProcessID) error {
|
||||
func (r *restream) DeleteProcess(id app.ProcessID, purge bool) error {
|
||||
task, ok := r.tasks.LoadAndLock(id)
|
||||
if !ok {
|
||||
return ErrUnknownProcess
|
||||
}
|
||||
defer r.tasks.Unlock(id)
|
||||
|
||||
err := r.deleteProcess(task)
|
||||
|
||||
r.tasks.Unlock(id)
|
||||
err := r.deleteProcess(task, purge)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -1319,13 +1317,13 @@ func (r *restream) DeleteProcess(id app.ProcessID) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *restream) deleteProcess(task *task) error {
|
||||
func (r *restream) deleteProcess(task *task, purge bool) error {
|
||||
if task.Order() != "stop" {
|
||||
return fmt.Errorf("the process with the ID '%s' is still running", task.String())
|
||||
}
|
||||
|
||||
r.unsetPlayoutPorts(task)
|
||||
r.unsetCleanup(task.ID())
|
||||
r.unsetCleanup(task.ID(), purge)
|
||||
|
||||
r.tasks.LoadAndDelete(task.ID())
|
||||
|
||||
@@ -1414,11 +1412,10 @@ func (r *restream) ReloadProcess(id app.ProcessID) error {
|
||||
if !ok {
|
||||
return ErrUnknownProcess
|
||||
}
|
||||
defer r.tasks.Unlock(id)
|
||||
|
||||
err := r.reloadProcess(task)
|
||||
|
||||
r.tasks.Unlock(id)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -1607,11 +1604,10 @@ func (r *restream) ReloadSkills() error {
|
||||
}
|
||||
|
||||
func (r *restream) GetPlayout(id app.ProcessID, inputid string) (string, error) {
|
||||
task, ok := r.tasks.LoadAndRLock(id)
|
||||
task, ok := r.tasks.LoadUnsafe(id)
|
||||
if !ok {
|
||||
return "", ErrUnknownProcess
|
||||
}
|
||||
defer r.tasks.RUnlock(id)
|
||||
|
||||
port, ok := task.playout[inputid]
|
||||
if !ok {
|
||||
|
@@ -258,7 +258,7 @@ func TestRemoveProcess(t *testing.T) {
|
||||
err = rs.AddProcess(process)
|
||||
require.Equal(t, nil, err, "Failed to add process (%s)", err)
|
||||
|
||||
err = rs.DeleteProcess(tid)
|
||||
err = rs.DeleteProcess(tid, true)
|
||||
require.Equal(t, nil, err, "Set process not found (%s)", process.ID)
|
||||
|
||||
_, err = rs.GetProcess(tid)
|
||||
@@ -1534,7 +1534,7 @@ func TestProcessReplacer(t *testing.T) {
|
||||
process = &app.Config{
|
||||
ID: "314159265359",
|
||||
Reference: "refref",
|
||||
FFVersion: "^4.4.1",
|
||||
FFVersion: "^7.1.1",
|
||||
Input: []app.ConfigIO{
|
||||
{
|
||||
ID: "in_314159265359_refref",
|
||||
@@ -1777,7 +1777,7 @@ func BenchmarkGetProcessState(b *testing.B) {
|
||||
}
|
||||
|
||||
for i := 0; i < n; i++ {
|
||||
rs.DeleteProcess(app.NewProcessID("test_"+strconv.Itoa(n), ""))
|
||||
rs.DeleteProcess(app.NewProcessID("test_"+strconv.Itoa(n), ""), true)
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -40,7 +40,7 @@ type Filesystem interface {
|
||||
fs.Filesystem
|
||||
|
||||
// UpdateCleanup
|
||||
UpdateCleanup(id string, patterns []Pattern)
|
||||
UpdateCleanup(id string, patterns []Pattern, purge bool)
|
||||
|
||||
// Start
|
||||
Start()
|
||||
@@ -128,7 +128,7 @@ func (rfs *filesystem) compilePatterns(patterns []Pattern) []Pattern {
|
||||
return patterns
|
||||
}
|
||||
|
||||
func (rfs *filesystem) UpdateCleanup(id string, newPatterns []Pattern) {
|
||||
func (rfs *filesystem) UpdateCleanup(id string, newPatterns []Pattern, purge bool) {
|
||||
newPatterns = rfs.compilePatterns(newPatterns)
|
||||
|
||||
rfs.cleanupLock.Lock()
|
||||
@@ -175,7 +175,9 @@ func (rfs *filesystem) UpdateCleanup(id string, newPatterns []Pattern) {
|
||||
}).Log("Remove pattern")
|
||||
}
|
||||
|
||||
if purge {
|
||||
rfs.purge(onlyCurrent)
|
||||
}
|
||||
}
|
||||
|
||||
func (rfs *filesystem) cleanup() {
|
||||
|
@@ -34,7 +34,7 @@ func TestUpdateCleanup(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
cleanfs.UpdateCleanup("foobar", patterns)
|
||||
cleanfs.UpdateCleanup("foobar", patterns, true)
|
||||
|
||||
require.Equal(t, cleanfs.cleanupPatterns["foobar"], patterns)
|
||||
|
||||
@@ -44,21 +44,21 @@ func TestUpdateCleanup(t *testing.T) {
|
||||
MaxFileAge: 0,
|
||||
})
|
||||
|
||||
cleanfs.UpdateCleanup("foobar", patterns)
|
||||
cleanfs.UpdateCleanup("foobar", patterns, true)
|
||||
|
||||
require.Equal(t, cleanfs.cleanupPatterns["foobar"], patterns)
|
||||
|
||||
patterns[0].MaxFiles = 42
|
||||
|
||||
cleanfs.UpdateCleanup("foobar", patterns)
|
||||
cleanfs.UpdateCleanup("foobar", patterns, true)
|
||||
|
||||
require.Equal(t, cleanfs.cleanupPatterns["foobar"], patterns)
|
||||
|
||||
cleanfs.UpdateCleanup("foobar", patterns[1:])
|
||||
cleanfs.UpdateCleanup("foobar", patterns[1:], true)
|
||||
|
||||
require.Equal(t, cleanfs.cleanupPatterns["foobar"], patterns[1:])
|
||||
|
||||
cleanfs.UpdateCleanup("foobar", nil)
|
||||
cleanfs.UpdateCleanup("foobar", nil, true)
|
||||
|
||||
require.Empty(t, cleanfs.cleanupPatterns["foobar"])
|
||||
}
|
||||
@@ -81,7 +81,7 @@ func TestMaxFiles(t *testing.T) {
|
||||
MaxFiles: 3,
|
||||
MaxFileAge: 0,
|
||||
},
|
||||
})
|
||||
}, true)
|
||||
|
||||
cleanfs.WriteFileReader("/chunk_0.ts", strings.NewReader("chunk_0"), -1)
|
||||
cleanfs.WriteFileReader("/chunk_1.ts", strings.NewReader("chunk_1"), -1)
|
||||
@@ -130,7 +130,7 @@ func TestMaxAge(t *testing.T) {
|
||||
MaxFiles: 0,
|
||||
MaxFileAge: 3 * time.Second,
|
||||
},
|
||||
})
|
||||
}, true)
|
||||
|
||||
cleanfs.WriteFileReader("/chunk_0.ts", strings.NewReader("chunk_0"), -1)
|
||||
cleanfs.WriteFileReader("/chunk_1.ts", strings.NewReader("chunk_1"), -1)
|
||||
@@ -179,7 +179,7 @@ func TestUnsetCleanup(t *testing.T) {
|
||||
MaxFiles: 3,
|
||||
MaxFileAge: 0,
|
||||
},
|
||||
})
|
||||
}, true)
|
||||
|
||||
cleanfs.WriteFileReader("/chunk_0.ts", strings.NewReader("chunk_0"), -1)
|
||||
cleanfs.WriteFileReader("/chunk_1.ts", strings.NewReader("chunk_1"), -1)
|
||||
@@ -207,7 +207,7 @@ func TestUnsetCleanup(t *testing.T) {
|
||||
return true
|
||||
}, 3*time.Second, time.Second)
|
||||
|
||||
cleanfs.UpdateCleanup("foobar", nil)
|
||||
cleanfs.UpdateCleanup("foobar", nil, true)
|
||||
|
||||
cleanfs.WriteFileReader("/chunk_4.ts", strings.NewReader("chunk_4"), -1)
|
||||
|
||||
@@ -249,7 +249,7 @@ func TestPurge(t *testing.T) {
|
||||
MaxFileAge: 0,
|
||||
PurgeOnDelete: true,
|
||||
},
|
||||
})
|
||||
}, true)
|
||||
|
||||
cleanfs.WriteFileReader("/chunk_0.ts", strings.NewReader("chunk_0"), -1)
|
||||
cleanfs.WriteFileReader("/chunk_1.ts", strings.NewReader("chunk_1"), -1)
|
||||
@@ -277,7 +277,7 @@ func TestPurge(t *testing.T) {
|
||||
return true
|
||||
}, 3*time.Second, time.Second)
|
||||
|
||||
cleanfs.UpdateCleanup("foobar", nil)
|
||||
cleanfs.UpdateCleanup("foobar", nil, true)
|
||||
|
||||
cleanfs.WriteFileReader("/chunk_4.ts", strings.NewReader("chunk_4"), -1)
|
||||
|
||||
@@ -350,7 +350,7 @@ func BenchmarkCleanup(b *testing.B) {
|
||||
},
|
||||
}
|
||||
|
||||
cleanfs.UpdateCleanup(id, patterns)
|
||||
cleanfs.UpdateCleanup(id, patterns, true)
|
||||
|
||||
ids[i] = id
|
||||
}
|
||||
@@ -429,7 +429,7 @@ func BenchmarkPurge(b *testing.B) {
|
||||
},
|
||||
}
|
||||
|
||||
cleanfs.UpdateCleanup(id, patterns)
|
||||
cleanfs.UpdateCleanup(id, patterns, false)
|
||||
|
||||
ids[i] = id
|
||||
}
|
||||
|
@@ -84,6 +84,8 @@ func (m *Storage) Has(id app.ProcessID) bool {
|
||||
return hasTask
|
||||
}
|
||||
|
||||
// LoadUnsafe returns the value stored in the map for a key, or zero value of type V if no value is present.
|
||||
// The ok result indicates whether value was found in the map.
|
||||
func (m *Storage) LoadUnsafe(id app.ProcessID) (*task, bool) {
|
||||
mt, ok := m.tasks.Load(id)
|
||||
if !ok {
|
||||
@@ -93,28 +95,6 @@ func (m *Storage) LoadUnsafe(id app.ProcessID) (*task, bool) {
|
||||
return mt.task, true
|
||||
}
|
||||
|
||||
// LoadAndRLock returns the value stored in the map for a key, or zero value of type V if no value is present.
|
||||
// The ok result indicates whether value was found in the map.
|
||||
func (m *Storage) LoadAndRLock(id app.ProcessID) (*task, bool) {
|
||||
mt, ok := m.tasks.Load(id)
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
mt.lock.RLock()
|
||||
|
||||
return mt.task, true
|
||||
}
|
||||
|
||||
func (m *Storage) RUnlock(id app.ProcessID) {
|
||||
mt, ok := m.tasks.Load(id)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
mt.lock.RUnlock()
|
||||
}
|
||||
|
||||
func (m *Storage) LoadAndLock(id app.ProcessID) (*task, bool) {
|
||||
mt, ok := m.tasks.Load(id)
|
||||
if !ok {
|
||||
|
Reference in New Issue
Block a user