Add S3 storage support

This commit is contained in:
Ingo Oppermann
2023-01-31 14:45:58 +01:00
parent c05e16b6a0
commit f519acfd71
351 changed files with 99292 additions and 1331 deletions

View File

@@ -62,6 +62,11 @@ func New(config Config) Filesystem {
rfs.logger = log.New("")
}
rfs.logger = rfs.logger.WithFields(log.Fields{
"name": config.FS.Name(),
"type": config.FS.Type(),
})
rfs.cleanupPatterns = make(map[string][]Pattern)
// already drain the stop

View File

@@ -62,8 +62,7 @@ type Config struct {
ID string
Name string
Store store.Store
DiskFS fs.Filesystem
MemFS fs.Filesystem
Filesystems []fs.Filesystem
Replace replace.Replacer
FFmpeg ffmpeg.FFmpeg
MaxProcesses int64
@@ -94,8 +93,8 @@ type restream struct {
maxProc int64
nProc int64
fs struct {
diskfs rfs.Filesystem
memfs rfs.Filesystem
list []rfs.Filesystem
diskfs []rfs.Filesystem
stopObserver context.CancelFunc
}
replace replace.Replacer
@@ -128,26 +127,18 @@ func New(config Config) (Restreamer, error) {
r.store = store.NewDummyStore(store.DummyConfig{})
}
if config.DiskFS != nil {
r.fs.diskfs = rfs.New(rfs.Config{
FS: config.DiskFS,
Logger: r.logger.WithComponent("Cleanup").WithField("type", "diskfs"),
for _, fs := range config.Filesystems {
fs := rfs.New(rfs.Config{
FS: fs,
Logger: r.logger.WithComponent("Cleanup"),
})
} else {
r.fs.diskfs = rfs.New(rfs.Config{
FS: fs.NewDummyFilesystem(),
})
}
if config.MemFS != nil {
r.fs.memfs = rfs.New(rfs.Config{
FS: config.MemFS,
Logger: r.logger.WithComponent("Cleanup").WithField("type", "memfs"),
})
} else {
r.fs.memfs = rfs.New(rfs.Config{
FS: fs.NewDummyFilesystem(),
})
r.fs.list = append(r.fs.list, fs)
// Add the diskfs filesystems also to a separate array. We need it later for input and output validation
if fs.Type() == "diskfs" {
r.fs.diskfs = append(r.fs.diskfs, fs)
}
}
if r.replace == nil {
@@ -186,12 +177,16 @@ func (r *restream) Start() {
r.setCleanup(id, t.config)
}
r.fs.diskfs.Start()
r.fs.memfs.Start()
ctx, cancel := context.WithCancel(context.Background())
r.fs.stopObserver = cancel
go r.observe(ctx, 10*time.Second)
for _, fs := range r.fs.list {
fs.Start()
if fs.Type() == "diskfs" {
go r.observe(ctx, fs, 10*time.Second)
}
}
r.stopOnce = sync.Once{}
})
@@ -215,14 +210,16 @@ func (r *restream) Stop() {
r.fs.stopObserver()
r.fs.diskfs.Stop()
r.fs.memfs.Stop()
// Stop the cleanup jobs
for _, fs := range r.fs.list {
fs.Stop()
}
r.startOnce = sync.Once{}
})
}
func (r *restream) observe(ctx context.Context, interval time.Duration) {
func (r *restream) observe(ctx context.Context, fs fs.Filesystem, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
@@ -231,14 +228,14 @@ func (r *restream) observe(ctx context.Context, interval time.Duration) {
case <-ctx.Done():
return
case <-ticker.C:
size, limit := r.fs.diskfs.Size()
size, limit := fs.Size()
isFull := false
if limit > 0 && size >= limit {
isFull = true
}
if isFull {
// Stop all tasks that write to disk
// Stop all tasks that write to this filesystem
r.lock.Lock()
for id, t := range r.tasks {
if !t.valid {
@@ -253,7 +250,7 @@ func (r *restream) observe(ctx context.Context, interval time.Duration) {
continue
}
r.logger.Warn().Log("Shutting down because disk is full")
r.logger.Warn().Log("Shutting down because filesystem is full")
r.stopProcess(id)
}
r.lock.Unlock()
@@ -503,34 +500,50 @@ func (r *restream) createTask(config *app.Config) (*task, error) {
}
func (r *restream) setCleanup(id string, config *app.Config) {
rePrefix := regexp.MustCompile(`^([a-z]+):`)
for _, output := range config.Output {
for _, c := range output.Cleanup {
if strings.HasPrefix(c.Pattern, "memfs:") {
r.fs.memfs.SetCleanup(id, []rfs.Pattern{
{
Pattern: strings.TrimPrefix(c.Pattern, "memfs:"),
MaxFiles: c.MaxFiles,
MaxFileAge: time.Duration(c.MaxFileAge) * time.Second,
PurgeOnDelete: c.PurgeOnDelete,
},
})
} else if strings.HasPrefix(c.Pattern, "diskfs:") {
r.fs.diskfs.SetCleanup(id, []rfs.Pattern{
{
Pattern: strings.TrimPrefix(c.Pattern, "diskfs:"),
MaxFiles: c.MaxFiles,
MaxFileAge: time.Duration(c.MaxFileAge) * time.Second,
PurgeOnDelete: c.PurgeOnDelete,
},
matches := rePrefix.FindStringSubmatch(c.Pattern)
if matches == nil {
continue
}
name := matches[1]
// Support legacy names
if name == "diskfs" {
name = "disk"
} else if name == "memfs" {
name = "mem"
}
for _, fs := range r.fs.list {
if fs.Name() != name {
continue
}
pattern := rfs.Pattern{
Pattern: rePrefix.ReplaceAllString(c.Pattern, ""),
MaxFiles: c.MaxFiles,
MaxFileAge: time.Duration(c.MaxFileAge) * time.Second,
PurgeOnDelete: c.PurgeOnDelete,
}
fs.SetCleanup(id, []rfs.Pattern{
pattern,
})
break
}
}
}
}
func (r *restream) unsetCleanup(id string) {
r.fs.diskfs.UnsetCleanup(id)
r.fs.memfs.UnsetCleanup(id)
for _, fs := range r.fs.list {
fs.UnsetCleanup(id)
}
}
func (r *restream) setPlayoutPorts(t *task) error {
@@ -619,9 +632,23 @@ func (r *restream) validateConfig(config *app.Config) (bool, error) {
return false, fmt.Errorf("the address for input '#%s:%s' must not be empty", config.ID, io.ID)
}
io.Address, err = r.validateInputAddress(io.Address, r.fs.diskfs.Base())
if err != nil {
return false, fmt.Errorf("the address for input '#%s:%s' (%s) is invalid: %w", config.ID, io.ID, io.Address, err)
if len(r.fs.diskfs) != 0 {
maxFails := 0
for _, fs := range r.fs.diskfs {
io.Address, err = r.validateInputAddress(io.Address, fs.Base())
if err != nil {
maxFails++
}
}
if maxFails == len(r.fs.diskfs) {
return false, fmt.Errorf("the address for input '#%s:%s' (%s) is invalid: %w", config.ID, io.ID, io.Address, err)
}
} else {
io.Address, err = r.validateInputAddress(io.Address, "/")
if err != nil {
return false, fmt.Errorf("the address for input '#%s:%s' (%s) is invalid: %w", config.ID, io.ID, io.Address, err)
}
}
}
@@ -651,15 +678,33 @@ func (r *restream) validateConfig(config *app.Config) (bool, error) {
return false, fmt.Errorf("the address for output '#%s:%s' must not be empty", config.ID, io.ID)
}
isFile := false
if len(r.fs.diskfs) != 0 {
maxFails := 0
for _, fs := range r.fs.diskfs {
isFile := false
io.Address, isFile, err = r.validateOutputAddress(io.Address, fs.Base())
if err != nil {
maxFails++
}
io.Address, isFile, err = r.validateOutputAddress(io.Address, r.fs.diskfs.Base())
if err != nil {
return false, fmt.Errorf("the address for output '#%s:%s' is invalid: %w", config.ID, io.ID, err)
}
if isFile {
hasFiles = true
}
}
if isFile {
hasFiles = true
if maxFails == len(r.fs.diskfs) {
return false, fmt.Errorf("the address for output '#%s:%s' is invalid: %w", config.ID, io.ID, err)
}
} else {
isFile := false
io.Address, isFile, err = r.validateOutputAddress(io.Address, "/")
if err != nil {
return false, fmt.Errorf("the address for output '#%s:%s' is invalid: %w", config.ID, io.ID, err)
}
if isFile {
hasFiles = true
}
}
}