mirror of
https://github.com/datarhei/core.git
synced 2025-09-26 20:11:29 +08:00
Fix HLS streaming and cleanup on diskfs
This commit is contained in:
@@ -2,12 +2,13 @@
|
||||
|
||||
### Core v16.9.1 > v16.10.0
|
||||
|
||||
- Add HLS session middleware to diskfs
|
||||
- Add /v3/metrics (get) endpoint to list all known metrics
|
||||
- Add logging HTTP request and response body sizes
|
||||
- Exclude .m3u8 and .mpd files from disk cache by default
|
||||
- Fix assigning cleanup rules for diskfs
|
||||
- Fix wrong path for swagger definition
|
||||
- Fix process cleanup on delete
|
||||
- Fix process cleanup on delete, remove empty directories from disk
|
||||
- Add process id and reference glob pattern matching
|
||||
- Fix SRT blocking port on restart (upgrade datarhei/gosrt)
|
||||
- Fix RTMP communication (datarhei/restreamer#385)
|
||||
|
@@ -193,14 +193,18 @@ func (h *DiskFSHandler) ListFiles(c echo.Context) error {
|
||||
|
||||
sort.Slice(files, sortFunc)
|
||||
|
||||
var fileinfos []api.FileInfo = make([]api.FileInfo, len(files))
|
||||
fileinfos := []api.FileInfo{}
|
||||
|
||||
for i, f := range files {
|
||||
fileinfos[i] = api.FileInfo{
|
||||
for _, f := range files {
|
||||
if f.IsDir() {
|
||||
continue
|
||||
}
|
||||
|
||||
fileinfos = append(fileinfos, api.FileInfo{
|
||||
Name: f.Name(),
|
||||
Size: f.Size(),
|
||||
LastMod: f.ModTime().Unix(),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
return c.JSON(http.StatusOK, fileinfos)
|
||||
|
164
http/middleware/hlsrewrite/hlsrewrite.go
Normal file
164
http/middleware/hlsrewrite/hlsrewrite.go
Normal file
@@ -0,0 +1,164 @@
|
||||
package hlsrewrite
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/labstack/echo/v4"
|
||||
"github.com/labstack/echo/v4/middleware"
|
||||
)
|
||||
|
||||
type HLSRewriteConfig struct {
|
||||
// Skipper defines a function to skip middleware.
|
||||
Skipper middleware.Skipper
|
||||
PathPrefix string
|
||||
}
|
||||
|
||||
var DefaultHLSRewriteConfig = HLSRewriteConfig{
|
||||
Skipper: func(c echo.Context) bool {
|
||||
req := c.Request()
|
||||
|
||||
return !strings.HasSuffix(req.URL.Path, ".m3u8")
|
||||
},
|
||||
PathPrefix: "",
|
||||
}
|
||||
|
||||
// NewHTTP returns a new HTTP session middleware with default config
|
||||
func NewHLSRewrite() echo.MiddlewareFunc {
|
||||
return NewHLSRewriteWithConfig(DefaultHLSRewriteConfig)
|
||||
}
|
||||
|
||||
type hlsrewrite struct {
|
||||
pathPrefix string
|
||||
}
|
||||
|
||||
func NewHLSRewriteWithConfig(config HLSRewriteConfig) echo.MiddlewareFunc {
|
||||
if config.Skipper == nil {
|
||||
config.Skipper = DefaultHLSRewriteConfig.Skipper
|
||||
}
|
||||
|
||||
pathPrefix := config.PathPrefix
|
||||
if len(pathPrefix) != 0 {
|
||||
if !strings.HasSuffix(pathPrefix, "/") {
|
||||
pathPrefix += "/"
|
||||
}
|
||||
}
|
||||
|
||||
hls := hlsrewrite{
|
||||
pathPrefix: pathPrefix,
|
||||
}
|
||||
|
||||
return func(next echo.HandlerFunc) echo.HandlerFunc {
|
||||
return func(c echo.Context) error {
|
||||
if config.Skipper(c) {
|
||||
return next(c)
|
||||
}
|
||||
|
||||
req := c.Request()
|
||||
|
||||
if req.Method == "GET" || req.Method == "HEAD" {
|
||||
return hls.rewrite(c, next)
|
||||
}
|
||||
|
||||
return next(c)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *hlsrewrite) rewrite(c echo.Context, next echo.HandlerFunc) error {
|
||||
req := c.Request()
|
||||
res := c.Response()
|
||||
|
||||
path := req.URL.Path
|
||||
|
||||
isM3U8 := strings.HasSuffix(path, ".m3u8")
|
||||
|
||||
rewrite := false
|
||||
|
||||
if isM3U8 {
|
||||
rewrite = true
|
||||
}
|
||||
|
||||
var rewriter *hlsRewriter
|
||||
|
||||
// Keep the current writer for later
|
||||
writer := res.Writer
|
||||
|
||||
if rewrite {
|
||||
// Put the session rewriter in the middle. This will collect
|
||||
// the data that we need to rewrite.
|
||||
rewriter = &hlsRewriter{
|
||||
ResponseWriter: res.Writer,
|
||||
}
|
||||
|
||||
res.Writer = rewriter
|
||||
}
|
||||
|
||||
if err := next(c); err != nil {
|
||||
c.Error(err)
|
||||
}
|
||||
|
||||
// Restore the original writer
|
||||
res.Writer = writer
|
||||
|
||||
if rewrite {
|
||||
if res.Status != 200 {
|
||||
res.Write(rewriter.buffer.Bytes())
|
||||
return nil
|
||||
}
|
||||
|
||||
// Rewrite the data befor sending it to the client
|
||||
rewriter.rewrite(h.pathPrefix)
|
||||
|
||||
res.Header().Set("Cache-Control", "private")
|
||||
res.Write(rewriter.buffer.Bytes())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type hlsRewriter struct {
|
||||
http.ResponseWriter
|
||||
buffer bytes.Buffer
|
||||
}
|
||||
|
||||
func (g *hlsRewriter) Write(data []byte) (int, error) {
|
||||
// Write the data into internal buffer for later rewrite
|
||||
w, err := g.buffer.Write(data)
|
||||
|
||||
return w, err
|
||||
}
|
||||
|
||||
func (g *hlsRewriter) rewrite(pathPrefix string) {
|
||||
var buffer bytes.Buffer
|
||||
|
||||
// Find all URLS in the .m3u8 and add the session ID to the query string
|
||||
scanner := bufio.NewScanner(&g.buffer)
|
||||
for scanner.Scan() {
|
||||
line := scanner.Text()
|
||||
|
||||
// Write empty lines unmodified
|
||||
if len(line) == 0 {
|
||||
buffer.WriteString(line + "\n")
|
||||
continue
|
||||
}
|
||||
|
||||
// Write comments unmodified
|
||||
if strings.HasPrefix(line, "#") {
|
||||
buffer.WriteString(line + "\n")
|
||||
continue
|
||||
}
|
||||
|
||||
// Rewrite
|
||||
line = strings.TrimPrefix(line, pathPrefix)
|
||||
buffer.WriteString(line + "\n")
|
||||
}
|
||||
|
||||
if err := scanner.Err(); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
g.buffer = buffer
|
||||
}
|
@@ -51,7 +51,7 @@ type hls struct {
|
||||
// NewHLS returns a new HLS session middleware
|
||||
func NewHLSWithConfig(config HLSConfig) echo.MiddlewareFunc {
|
||||
if config.Skipper == nil {
|
||||
config.Skipper = DefaultHTTPConfig.Skipper
|
||||
config.Skipper = DefaultHLSConfig.Skipper
|
||||
}
|
||||
|
||||
if config.EgressCollector == nil {
|
||||
|
@@ -54,6 +54,7 @@ import (
|
||||
mwcache "github.com/datarhei/core/v16/http/middleware/cache"
|
||||
mwcors "github.com/datarhei/core/v16/http/middleware/cors"
|
||||
mwgzip "github.com/datarhei/core/v16/http/middleware/gzip"
|
||||
mwhlsrewrite "github.com/datarhei/core/v16/http/middleware/hlsrewrite"
|
||||
mwiplimit "github.com/datarhei/core/v16/http/middleware/iplimit"
|
||||
mwlog "github.com/datarhei/core/v16/http/middleware/log"
|
||||
mwmime "github.com/datarhei/core/v16/http/middleware/mime"
|
||||
@@ -144,6 +145,7 @@ type server struct {
|
||||
cors echo.MiddlewareFunc
|
||||
cache echo.MiddlewareFunc
|
||||
session echo.MiddlewareFunc
|
||||
hlsrewrite echo.MiddlewareFunc
|
||||
}
|
||||
|
||||
memfs struct {
|
||||
@@ -184,6 +186,10 @@ func NewServer(config Config) (Server, error) {
|
||||
config.Cache,
|
||||
)
|
||||
|
||||
s.middleware.hlsrewrite = mwhlsrewrite.NewHLSRewriteWithConfig(mwhlsrewrite.HLSRewriteConfig{
|
||||
PathPrefix: config.DiskFS.Base(),
|
||||
})
|
||||
|
||||
s.memfs.enableAuth = config.MemFS.EnableAuth
|
||||
s.memfs.username = config.MemFS.Username
|
||||
s.memfs.password = config.MemFS.Password
|
||||
@@ -445,6 +451,10 @@ func (s *server) setRoutes() {
|
||||
if s.middleware.cache != nil {
|
||||
fs.Use(s.middleware.cache)
|
||||
}
|
||||
fs.Use(s.middleware.hlsrewrite)
|
||||
if s.middleware.session != nil {
|
||||
fs.Use(s.middleware.session)
|
||||
}
|
||||
|
||||
fs.GET("", s.handler.diskfs.GetFile)
|
||||
fs.HEAD("", s.handler.diskfs.GetFile)
|
||||
|
@@ -291,11 +291,19 @@ func (fs *diskFilesystem) List(pattern string) []FileInfo {
|
||||
files := []FileInfo{}
|
||||
|
||||
fs.walk(func(path string, info os.FileInfo) {
|
||||
if path == fs.dir {
|
||||
return
|
||||
}
|
||||
|
||||
name := strings.TrimPrefix(path, fs.dir)
|
||||
if name[0] != os.PathSeparator {
|
||||
name = string(os.PathSeparator) + name
|
||||
}
|
||||
|
||||
if info.IsDir() {
|
||||
name += "/"
|
||||
}
|
||||
|
||||
if len(pattern) != 0 {
|
||||
if ok, _ := glob.Match(pattern, name, '/'); !ok {
|
||||
return
|
||||
@@ -319,6 +327,7 @@ func (fs *diskFilesystem) walk(walkfn func(path string, info os.FileInfo)) {
|
||||
}
|
||||
|
||||
if info.IsDir() {
|
||||
walkfn(path, info)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@@ -53,52 +53,52 @@ type filesystem struct {
|
||||
}
|
||||
|
||||
func New(config Config) Filesystem {
|
||||
fs := &filesystem{
|
||||
rfs := &filesystem{
|
||||
Filesystem: config.FS,
|
||||
logger: config.Logger,
|
||||
}
|
||||
|
||||
if fs.logger == nil {
|
||||
fs.logger = log.New("")
|
||||
if rfs.logger == nil {
|
||||
rfs.logger = log.New("")
|
||||
}
|
||||
|
||||
fs.cleanupPatterns = make(map[string][]Pattern)
|
||||
rfs.cleanupPatterns = make(map[string][]Pattern)
|
||||
|
||||
// already drain the stop
|
||||
fs.stopOnce.Do(func() {})
|
||||
rfs.stopOnce.Do(func() {})
|
||||
|
||||
return fs
|
||||
return rfs
|
||||
}
|
||||
|
||||
func (fs *filesystem) Start() {
|
||||
fs.startOnce.Do(func() {
|
||||
func (rfs *filesystem) Start() {
|
||||
rfs.startOnce.Do(func() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
fs.stopTicker = cancel
|
||||
go fs.cleanupTicker(ctx, time.Second)
|
||||
rfs.stopTicker = cancel
|
||||
go rfs.cleanupTicker(ctx, time.Second)
|
||||
|
||||
fs.stopOnce = sync.Once{}
|
||||
rfs.stopOnce = sync.Once{}
|
||||
|
||||
fs.logger.Debug().Log("Starting cleanup")
|
||||
rfs.logger.Debug().Log("Starting cleanup")
|
||||
})
|
||||
}
|
||||
|
||||
func (fs *filesystem) Stop() {
|
||||
fs.stopOnce.Do(func() {
|
||||
fs.stopTicker()
|
||||
func (rfs *filesystem) Stop() {
|
||||
rfs.stopOnce.Do(func() {
|
||||
rfs.stopTicker()
|
||||
|
||||
fs.startOnce = sync.Once{}
|
||||
rfs.startOnce = sync.Once{}
|
||||
|
||||
fs.logger.Debug().Log("Stopping cleanup")
|
||||
rfs.logger.Debug().Log("Stopping cleanup")
|
||||
})
|
||||
}
|
||||
|
||||
func (fs *filesystem) SetCleanup(id string, patterns []Pattern) {
|
||||
func (rfs *filesystem) SetCleanup(id string, patterns []Pattern) {
|
||||
if len(patterns) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
for _, p := range patterns {
|
||||
fs.logger.Debug().WithFields(log.Fields{
|
||||
rfs.logger.Debug().WithFields(log.Fields{
|
||||
"id": id,
|
||||
"pattern": p.Pattern,
|
||||
"max_files": p.MaxFiles,
|
||||
@@ -106,38 +106,47 @@ func (fs *filesystem) SetCleanup(id string, patterns []Pattern) {
|
||||
}).Log("Add pattern")
|
||||
}
|
||||
|
||||
fs.cleanupLock.Lock()
|
||||
defer fs.cleanupLock.Unlock()
|
||||
rfs.cleanupLock.Lock()
|
||||
defer rfs.cleanupLock.Unlock()
|
||||
|
||||
fs.cleanupPatterns[id] = append(fs.cleanupPatterns[id], patterns...)
|
||||
rfs.cleanupPatterns[id] = append(rfs.cleanupPatterns[id], patterns...)
|
||||
}
|
||||
|
||||
func (fs *filesystem) UnsetCleanup(id string) {
|
||||
fs.logger.Debug().WithField("id", id).Log("Remove pattern group")
|
||||
func (rfs *filesystem) UnsetCleanup(id string) {
|
||||
rfs.logger.Debug().WithField("id", id).Log("Remove pattern group")
|
||||
|
||||
fs.cleanupLock.Lock()
|
||||
defer fs.cleanupLock.Unlock()
|
||||
rfs.cleanupLock.Lock()
|
||||
defer rfs.cleanupLock.Unlock()
|
||||
|
||||
patterns := fs.cleanupPatterns[id]
|
||||
delete(fs.cleanupPatterns, id)
|
||||
patterns := rfs.cleanupPatterns[id]
|
||||
delete(rfs.cleanupPatterns, id)
|
||||
|
||||
fs.purge(patterns)
|
||||
rfs.purge(patterns)
|
||||
}
|
||||
|
||||
func (fs *filesystem) cleanup() {
|
||||
fs.cleanupLock.RLock()
|
||||
defer fs.cleanupLock.RUnlock()
|
||||
func (rfs *filesystem) cleanup() {
|
||||
rfs.cleanupLock.RLock()
|
||||
defer rfs.cleanupLock.RUnlock()
|
||||
|
||||
for _, patterns := range fs.cleanupPatterns {
|
||||
for _, patterns := range rfs.cleanupPatterns {
|
||||
for _, pattern := range patterns {
|
||||
files := fs.Filesystem.List(pattern.Pattern)
|
||||
filesAndDirs := rfs.Filesystem.List(pattern.Pattern)
|
||||
|
||||
files := []fs.FileInfo{}
|
||||
for _, f := range filesAndDirs {
|
||||
if f.IsDir() {
|
||||
continue
|
||||
}
|
||||
|
||||
files = append(files, f)
|
||||
}
|
||||
|
||||
sort.Slice(files, func(i, j int) bool { return files[i].ModTime().Before(files[j].ModTime()) })
|
||||
|
||||
if pattern.MaxFiles > 0 && uint(len(files)) > pattern.MaxFiles {
|
||||
for i := uint(0); i < uint(len(files))-pattern.MaxFiles; i++ {
|
||||
fs.logger.Debug().WithField("path", files[i].Name()).Log("Remove file because MaxFiles is exceeded")
|
||||
fs.Filesystem.Delete(files[i].Name())
|
||||
rfs.logger.Debug().WithField("path", files[i].Name()).Log("Remove file because MaxFiles is exceeded")
|
||||
rfs.Filesystem.Delete(files[i].Name())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -146,8 +155,8 @@ func (fs *filesystem) cleanup() {
|
||||
|
||||
for _, f := range files {
|
||||
if f.ModTime().Before(bestBefore) {
|
||||
fs.logger.Debug().WithField("path", f.Name()).Log("Remove file because MaxFileAge is exceeded")
|
||||
fs.Filesystem.Delete(f.Name())
|
||||
rfs.logger.Debug().WithField("path", f.Name()).Log("Remove file because MaxFileAge is exceeded")
|
||||
rfs.Filesystem.Delete(f.Name())
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -155,16 +164,17 @@ func (fs *filesystem) cleanup() {
|
||||
}
|
||||
}
|
||||
|
||||
func (fs *filesystem) purge(patterns []Pattern) (nfiles uint64) {
|
||||
func (rfs *filesystem) purge(patterns []Pattern) (nfiles uint64) {
|
||||
for _, pattern := range patterns {
|
||||
if !pattern.PurgeOnDelete {
|
||||
continue
|
||||
}
|
||||
|
||||
files := fs.Filesystem.List(pattern.Pattern)
|
||||
files := rfs.Filesystem.List(pattern.Pattern)
|
||||
sort.Slice(files, func(i, j int) bool { return len(files[i].Name()) > len(files[j].Name()) })
|
||||
for _, f := range files {
|
||||
fs.logger.Debug().WithField("path", f.Name()).Log("Purging file")
|
||||
fs.Filesystem.Delete(f.Name())
|
||||
rfs.logger.Debug().WithField("path", f.Name()).Log("Purging file")
|
||||
rfs.Filesystem.Delete(f.Name())
|
||||
nfiles++
|
||||
}
|
||||
}
|
||||
@@ -172,7 +182,7 @@ func (fs *filesystem) purge(patterns []Pattern) (nfiles uint64) {
|
||||
return
|
||||
}
|
||||
|
||||
func (fs *filesystem) cleanupTicker(ctx context.Context, interval time.Duration) {
|
||||
func (rfs *filesystem) cleanupTicker(ctx context.Context, interval time.Duration) {
|
||||
ticker := time.NewTicker(interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
@@ -181,7 +191,7 @@ func (fs *filesystem) cleanupTicker(ctx context.Context, interval time.Duration)
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
fs.cleanup()
|
||||
rfs.cleanup()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user