From f1d71c202b2a2020a9863d1b3ae38da9df8aeb5a Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Thu, 8 Sep 2022 15:00:09 +0200 Subject: [PATCH] Fix HLS streaming and cleanup on diskfs --- CHANGELOG.md | 3 +- http/handler/api/diskfs.go | 12 +- http/middleware/hlsrewrite/hlsrewrite.go | 164 +++++++++++++++++++++++ http/middleware/session/HLS.go | 2 +- http/server.go | 10 ++ io/fs/disk.go | 9 ++ restream/fs/fs.go | 98 ++++++++------ 7 files changed, 248 insertions(+), 50 deletions(-) create mode 100644 http/middleware/hlsrewrite/hlsrewrite.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 80250a2f..d74ecdd3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,12 +2,13 @@ ### Core v16.9.1 > v16.10.0 +- Add HLS session middleware to diskfs - Add /v3/metrics (get) endpoint to list all known metrics - Add logging HTTP request and response body sizes - Exclude .m3u8 and .mpd files from disk cache by default - Fix assigning cleanup rules for diskfs - Fix wrong path for swagger definition -- Fix process cleanup on delete +- Fix process cleanup on delete, remove empty directories from disk - Add process id and reference glob pattern matching - Fix SRT blocking port on restart (upgrade datarhei/gosrt) - Fix RTMP communication (datarhei/restreamer#385) diff --git a/http/handler/api/diskfs.go b/http/handler/api/diskfs.go index c143619d..b4bc7fb7 100644 --- a/http/handler/api/diskfs.go +++ b/http/handler/api/diskfs.go @@ -193,14 +193,18 @@ func (h *DiskFSHandler) ListFiles(c echo.Context) error { sort.Slice(files, sortFunc) - var fileinfos []api.FileInfo = make([]api.FileInfo, len(files)) + fileinfos := []api.FileInfo{} - for i, f := range files { - fileinfos[i] = api.FileInfo{ + for _, f := range files { + if f.IsDir() { + continue + } + + fileinfos = append(fileinfos, api.FileInfo{ Name: f.Name(), Size: f.Size(), LastMod: f.ModTime().Unix(), - } + }) } return c.JSON(http.StatusOK, fileinfos) diff --git a/http/middleware/hlsrewrite/hlsrewrite.go b/http/middleware/hlsrewrite/hlsrewrite.go new file mode 100644 index 00000000..674228bf --- /dev/null +++ b/http/middleware/hlsrewrite/hlsrewrite.go @@ -0,0 +1,164 @@ +package hlsrewrite + +import ( + "bufio" + "bytes" + "net/http" + "strings" + + "github.com/labstack/echo/v4" + "github.com/labstack/echo/v4/middleware" +) + +type HLSRewriteConfig struct { + // Skipper defines a function to skip middleware. + Skipper middleware.Skipper + PathPrefix string +} + +var DefaultHLSRewriteConfig = HLSRewriteConfig{ + Skipper: func(c echo.Context) bool { + req := c.Request() + + return !strings.HasSuffix(req.URL.Path, ".m3u8") + }, + PathPrefix: "", +} + +// NewHTTP returns a new HTTP session middleware with default config +func NewHLSRewrite() echo.MiddlewareFunc { + return NewHLSRewriteWithConfig(DefaultHLSRewriteConfig) +} + +type hlsrewrite struct { + pathPrefix string +} + +func NewHLSRewriteWithConfig(config HLSRewriteConfig) echo.MiddlewareFunc { + if config.Skipper == nil { + config.Skipper = DefaultHLSRewriteConfig.Skipper + } + + pathPrefix := config.PathPrefix + if len(pathPrefix) != 0 { + if !strings.HasSuffix(pathPrefix, "/") { + pathPrefix += "/" + } + } + + hls := hlsrewrite{ + pathPrefix: pathPrefix, + } + + return func(next echo.HandlerFunc) echo.HandlerFunc { + return func(c echo.Context) error { + if config.Skipper(c) { + return next(c) + } + + req := c.Request() + + if req.Method == "GET" || req.Method == "HEAD" { + return hls.rewrite(c, next) + } + + return next(c) + } + } +} + +func (h *hlsrewrite) rewrite(c echo.Context, next echo.HandlerFunc) error { + req := c.Request() + res := c.Response() + + path := req.URL.Path + + isM3U8 := strings.HasSuffix(path, ".m3u8") + + rewrite := false + + if isM3U8 { + rewrite = true + } + + var rewriter *hlsRewriter + + // Keep the current writer for later + writer := res.Writer + + if rewrite { + // Put the session rewriter in the middle. This will collect + // the data that we need to rewrite. + rewriter = &hlsRewriter{ + ResponseWriter: res.Writer, + } + + res.Writer = rewriter + } + + if err := next(c); err != nil { + c.Error(err) + } + + // Restore the original writer + res.Writer = writer + + if rewrite { + if res.Status != 200 { + res.Write(rewriter.buffer.Bytes()) + return nil + } + + // Rewrite the data befor sending it to the client + rewriter.rewrite(h.pathPrefix) + + res.Header().Set("Cache-Control", "private") + res.Write(rewriter.buffer.Bytes()) + } + + return nil +} + +type hlsRewriter struct { + http.ResponseWriter + buffer bytes.Buffer +} + +func (g *hlsRewriter) Write(data []byte) (int, error) { + // Write the data into internal buffer for later rewrite + w, err := g.buffer.Write(data) + + return w, err +} + +func (g *hlsRewriter) rewrite(pathPrefix string) { + var buffer bytes.Buffer + + // Find all URLS in the .m3u8 and add the session ID to the query string + scanner := bufio.NewScanner(&g.buffer) + for scanner.Scan() { + line := scanner.Text() + + // Write empty lines unmodified + if len(line) == 0 { + buffer.WriteString(line + "\n") + continue + } + + // Write comments unmodified + if strings.HasPrefix(line, "#") { + buffer.WriteString(line + "\n") + continue + } + + // Rewrite + line = strings.TrimPrefix(line, pathPrefix) + buffer.WriteString(line + "\n") + } + + if err := scanner.Err(); err != nil { + return + } + + g.buffer = buffer +} diff --git a/http/middleware/session/HLS.go b/http/middleware/session/HLS.go index 3de73be4..cb0768ca 100644 --- a/http/middleware/session/HLS.go +++ b/http/middleware/session/HLS.go @@ -51,7 +51,7 @@ type hls struct { // NewHLS returns a new HLS session middleware func NewHLSWithConfig(config HLSConfig) echo.MiddlewareFunc { if config.Skipper == nil { - config.Skipper = DefaultHTTPConfig.Skipper + config.Skipper = DefaultHLSConfig.Skipper } if config.EgressCollector == nil { diff --git a/http/server.go b/http/server.go index 4503a7b0..00ce8896 100644 --- a/http/server.go +++ b/http/server.go @@ -54,6 +54,7 @@ import ( mwcache "github.com/datarhei/core/v16/http/middleware/cache" mwcors "github.com/datarhei/core/v16/http/middleware/cors" mwgzip "github.com/datarhei/core/v16/http/middleware/gzip" + mwhlsrewrite "github.com/datarhei/core/v16/http/middleware/hlsrewrite" mwiplimit "github.com/datarhei/core/v16/http/middleware/iplimit" mwlog "github.com/datarhei/core/v16/http/middleware/log" mwmime "github.com/datarhei/core/v16/http/middleware/mime" @@ -144,6 +145,7 @@ type server struct { cors echo.MiddlewareFunc cache echo.MiddlewareFunc session echo.MiddlewareFunc + hlsrewrite echo.MiddlewareFunc } memfs struct { @@ -184,6 +186,10 @@ func NewServer(config Config) (Server, error) { config.Cache, ) + s.middleware.hlsrewrite = mwhlsrewrite.NewHLSRewriteWithConfig(mwhlsrewrite.HLSRewriteConfig{ + PathPrefix: config.DiskFS.Base(), + }) + s.memfs.enableAuth = config.MemFS.EnableAuth s.memfs.username = config.MemFS.Username s.memfs.password = config.MemFS.Password @@ -445,6 +451,10 @@ func (s *server) setRoutes() { if s.middleware.cache != nil { fs.Use(s.middleware.cache) } + fs.Use(s.middleware.hlsrewrite) + if s.middleware.session != nil { + fs.Use(s.middleware.session) + } fs.GET("", s.handler.diskfs.GetFile) fs.HEAD("", s.handler.diskfs.GetFile) diff --git a/io/fs/disk.go b/io/fs/disk.go index 6d4c09ab..bf9e1843 100644 --- a/io/fs/disk.go +++ b/io/fs/disk.go @@ -291,11 +291,19 @@ func (fs *diskFilesystem) List(pattern string) []FileInfo { files := []FileInfo{} fs.walk(func(path string, info os.FileInfo) { + if path == fs.dir { + return + } + name := strings.TrimPrefix(path, fs.dir) if name[0] != os.PathSeparator { name = string(os.PathSeparator) + name } + if info.IsDir() { + name += "/" + } + if len(pattern) != 0 { if ok, _ := glob.Match(pattern, name, '/'); !ok { return @@ -319,6 +327,7 @@ func (fs *diskFilesystem) walk(walkfn func(path string, info os.FileInfo)) { } if info.IsDir() { + walkfn(path, info) return nil } diff --git a/restream/fs/fs.go b/restream/fs/fs.go index 4769597f..29216aa9 100644 --- a/restream/fs/fs.go +++ b/restream/fs/fs.go @@ -53,52 +53,52 @@ type filesystem struct { } func New(config Config) Filesystem { - fs := &filesystem{ + rfs := &filesystem{ Filesystem: config.FS, logger: config.Logger, } - if fs.logger == nil { - fs.logger = log.New("") + if rfs.logger == nil { + rfs.logger = log.New("") } - fs.cleanupPatterns = make(map[string][]Pattern) + rfs.cleanupPatterns = make(map[string][]Pattern) // already drain the stop - fs.stopOnce.Do(func() {}) + rfs.stopOnce.Do(func() {}) - return fs + return rfs } -func (fs *filesystem) Start() { - fs.startOnce.Do(func() { +func (rfs *filesystem) Start() { + rfs.startOnce.Do(func() { ctx, cancel := context.WithCancel(context.Background()) - fs.stopTicker = cancel - go fs.cleanupTicker(ctx, time.Second) + rfs.stopTicker = cancel + go rfs.cleanupTicker(ctx, time.Second) - fs.stopOnce = sync.Once{} + rfs.stopOnce = sync.Once{} - fs.logger.Debug().Log("Starting cleanup") + rfs.logger.Debug().Log("Starting cleanup") }) } -func (fs *filesystem) Stop() { - fs.stopOnce.Do(func() { - fs.stopTicker() +func (rfs *filesystem) Stop() { + rfs.stopOnce.Do(func() { + rfs.stopTicker() - fs.startOnce = sync.Once{} + rfs.startOnce = sync.Once{} - fs.logger.Debug().Log("Stopping cleanup") + rfs.logger.Debug().Log("Stopping cleanup") }) } -func (fs *filesystem) SetCleanup(id string, patterns []Pattern) { +func (rfs *filesystem) SetCleanup(id string, patterns []Pattern) { if len(patterns) == 0 { return } for _, p := range patterns { - fs.logger.Debug().WithFields(log.Fields{ + rfs.logger.Debug().WithFields(log.Fields{ "id": id, "pattern": p.Pattern, "max_files": p.MaxFiles, @@ -106,38 +106,47 @@ func (fs *filesystem) SetCleanup(id string, patterns []Pattern) { }).Log("Add pattern") } - fs.cleanupLock.Lock() - defer fs.cleanupLock.Unlock() + rfs.cleanupLock.Lock() + defer rfs.cleanupLock.Unlock() - fs.cleanupPatterns[id] = append(fs.cleanupPatterns[id], patterns...) + rfs.cleanupPatterns[id] = append(rfs.cleanupPatterns[id], patterns...) } -func (fs *filesystem) UnsetCleanup(id string) { - fs.logger.Debug().WithField("id", id).Log("Remove pattern group") +func (rfs *filesystem) UnsetCleanup(id string) { + rfs.logger.Debug().WithField("id", id).Log("Remove pattern group") - fs.cleanupLock.Lock() - defer fs.cleanupLock.Unlock() + rfs.cleanupLock.Lock() + defer rfs.cleanupLock.Unlock() - patterns := fs.cleanupPatterns[id] - delete(fs.cleanupPatterns, id) + patterns := rfs.cleanupPatterns[id] + delete(rfs.cleanupPatterns, id) - fs.purge(patterns) + rfs.purge(patterns) } -func (fs *filesystem) cleanup() { - fs.cleanupLock.RLock() - defer fs.cleanupLock.RUnlock() +func (rfs *filesystem) cleanup() { + rfs.cleanupLock.RLock() + defer rfs.cleanupLock.RUnlock() - for _, patterns := range fs.cleanupPatterns { + for _, patterns := range rfs.cleanupPatterns { for _, pattern := range patterns { - files := fs.Filesystem.List(pattern.Pattern) + filesAndDirs := rfs.Filesystem.List(pattern.Pattern) + + files := []fs.FileInfo{} + for _, f := range filesAndDirs { + if f.IsDir() { + continue + } + + files = append(files, f) + } sort.Slice(files, func(i, j int) bool { return files[i].ModTime().Before(files[j].ModTime()) }) if pattern.MaxFiles > 0 && uint(len(files)) > pattern.MaxFiles { for i := uint(0); i < uint(len(files))-pattern.MaxFiles; i++ { - fs.logger.Debug().WithField("path", files[i].Name()).Log("Remove file because MaxFiles is exceeded") - fs.Filesystem.Delete(files[i].Name()) + rfs.logger.Debug().WithField("path", files[i].Name()).Log("Remove file because MaxFiles is exceeded") + rfs.Filesystem.Delete(files[i].Name()) } } @@ -146,8 +155,8 @@ func (fs *filesystem) cleanup() { for _, f := range files { if f.ModTime().Before(bestBefore) { - fs.logger.Debug().WithField("path", f.Name()).Log("Remove file because MaxFileAge is exceeded") - fs.Filesystem.Delete(f.Name()) + rfs.logger.Debug().WithField("path", f.Name()).Log("Remove file because MaxFileAge is exceeded") + rfs.Filesystem.Delete(f.Name()) } } } @@ -155,16 +164,17 @@ func (fs *filesystem) cleanup() { } } -func (fs *filesystem) purge(patterns []Pattern) (nfiles uint64) { +func (rfs *filesystem) purge(patterns []Pattern) (nfiles uint64) { for _, pattern := range patterns { if !pattern.PurgeOnDelete { continue } - files := fs.Filesystem.List(pattern.Pattern) + files := rfs.Filesystem.List(pattern.Pattern) + sort.Slice(files, func(i, j int) bool { return len(files[i].Name()) > len(files[j].Name()) }) for _, f := range files { - fs.logger.Debug().WithField("path", f.Name()).Log("Purging file") - fs.Filesystem.Delete(f.Name()) + rfs.logger.Debug().WithField("path", f.Name()).Log("Purging file") + rfs.Filesystem.Delete(f.Name()) nfiles++ } } @@ -172,7 +182,7 @@ func (fs *filesystem) purge(patterns []Pattern) (nfiles uint64) { return } -func (fs *filesystem) cleanupTicker(ctx context.Context, interval time.Duration) { +func (rfs *filesystem) cleanupTicker(ctx context.Context, interval time.Duration) { ticker := time.NewTicker(interval) defer ticker.Stop() @@ -181,7 +191,7 @@ func (fs *filesystem) cleanupTicker(ctx context.Context, interval time.Duration) case <-ctx.Done(): return case <-ticker.C: - fs.cleanup() + rfs.cleanup() } } }