diff --git a/app/api/api.go b/app/api/api.go index b5b95e26..be702127 100644 --- a/app/api/api.go +++ b/app/api/api.go @@ -60,22 +60,24 @@ type API interface { } type api struct { - restream restream.Restreamer - ffmpeg ffmpeg.FFmpeg - diskfs fs.Filesystem - memfs fs.Filesystem - rtmpserver rtmp.Server - srtserver srt.Server - metrics monitor.HistoryMonitor - prom prometheus.Metrics - service service.Service - sessions session.Registry - cache cache.Cacher - mainserver *gohttp.Server - sidecarserver *gohttp.Server - httpjwt jwt.JWT - update update.Checker - replacer replace.Replacer + restream restream.Restreamer + ffmpeg ffmpeg.FFmpeg + diskfs fs.Filesystem + memfs fs.Filesystem + s3fs fs.Filesystem + rtmpserver rtmp.Server + srtserver srt.Server + metrics monitor.HistoryMonitor + prom prometheus.Metrics + service service.Service + sessions session.Registry + sessionsLimiter net.IPLimiter + cache cache.Cacher + mainserver *gohttp.Server + sidecarserver *gohttp.Server + httpjwt jwt.JWT + update update.Checker + replacer replace.Replacer errorChan chan error @@ -408,6 +410,32 @@ func (a *api) start() error { a.memfs.Resize(cfg.Storage.Memory.Size * 1024 * 1024) } + baseS3FS := url.URL{ + Scheme: "http", + Path: "/s3", + } + + host, port, _ = gonet.SplitHostPort(cfg.Address) + if len(host) == 0 { + baseS3FS.Host = "localhost:" + port + } else { + baseS3FS.Host = cfg.Address + } + + if cfg.Storage.Memory.Auth.Enable { + baseS3FS.User = url.UserPassword(cfg.Storage.Memory.Auth.Username, cfg.Storage.Memory.Auth.Password) + } + + s3fs, err := fs.NewS3Filesystem(fs.S3Config{ + Base: baseS3FS.String(), + Logger: a.log.logger.core.WithComponent("S3FS"), + }) + if err != nil { + return err + } + + a.s3fs = s3fs + var portrange net.Portranger if cfg.Playout.Enable { @@ -552,6 +580,7 @@ func (a *api) start() error { metrics.Register(monitor.NewDiskCollector(a.diskfs.Base())) metrics.Register(monitor.NewFilesystemCollector("diskfs", diskfs)) metrics.Register(monitor.NewFilesystemCollector("memfs", a.memfs)) + metrics.Register(monitor.NewFilesystemCollector("s3fs", a.s3fs)) metrics.Register(monitor.NewRestreamCollector(a.restream)) metrics.Register(monitor.NewFFmpegCollector(a.ffmpeg)) metrics.Register(monitor.NewSessionCollector(a.sessions, []string{})) @@ -804,6 +833,12 @@ func (a *api) start() error { Password: cfg.Storage.Memory.Auth.Password, Filesystem: a.memfs, }, + S3FS: http.MemFSConfig{ + EnableAuth: cfg.Storage.Memory.Auth.Enable, + Username: cfg.Storage.Memory.Auth.Username, + Password: cfg.Storage.Memory.Auth.Password, + Filesystem: a.s3fs, + }, IPLimiter: iplimiter, Profiling: cfg.Debug.Profiling, Cors: http.CorsConfig{ @@ -866,6 +901,12 @@ func (a *api) start() error { Password: cfg.Storage.Memory.Auth.Password, Filesystem: a.memfs, }, + S3FS: http.MemFSConfig{ + EnableAuth: cfg.Storage.Memory.Auth.Enable, + Username: cfg.Storage.Memory.Auth.Username, + Password: cfg.Storage.Memory.Auth.Password, + Filesystem: a.s3fs, + }, IPLimiter: iplimiter, Profiling: cfg.Debug.Profiling, Cors: http.CorsConfig{ diff --git a/http/server.go b/http/server.go index 439df8ab..1353d043 100644 --- a/http/server.go +++ b/http/server.go @@ -81,6 +81,7 @@ type Config struct { MimeTypesFile string DiskFS fs.Filesystem MemFS MemFSConfig + S3FS MemFSConfig IPLimiter net.IPLimiter Profiling bool Cors CorsConfig @@ -115,6 +116,7 @@ type server struct { handler struct { about *api.AboutHandler memfs *handler.MemFSHandler + s3fs *handler.MemFSHandler diskfs *handler.DiskFSHandler prometheus *handler.PrometheusHandler profiling *handler.ProfilingHandler @@ -128,6 +130,7 @@ type server struct { restream *api.RestreamHandler playout *api.PlayoutHandler memfs *api.MemFSHandler + s3fs *api.MemFSHandler diskfs *api.DiskFSHandler rtmp *api.RTMPHandler srt *api.SRTHandler @@ -229,6 +232,16 @@ func NewServer(config Config) (Server, error) { ) } + if config.S3FS.Filesystem != nil { + s.v3handler.s3fs = api.NewMemFS( + config.S3FS.Filesystem, + ) + + s.handler.s3fs = handler.NewMemFS( + config.S3FS.Filesystem, + ) + } + if config.Prometheus != nil { s.handler.prometheus = handler.NewPrometheus( config.Prometheus.HTTPHandler(), @@ -307,6 +320,7 @@ func NewServer(config Config) (Server, error) { "/": config.Cors.Origins, "/api": {"*"}, "/memfs": config.Cors.Origins, + "/s3": config.Cors.Origins, }, }); err != nil { return nil, err @@ -491,6 +505,46 @@ func (s *server) setRoutes() { } } + // S3 FS + if s.handler.s3fs != nil { + memfs := s.router.Group("/s3fs/*") + memfs.Use(mwmime.NewWithConfig(mwmime.Config{ + MimeTypesFile: s.mimeTypesFile, + DefaultContentType: "application/data", + })) + memfs.Use(mwgzip.NewWithConfig(mwgzip.Config{ + Level: mwgzip.BestSpeed, + MinLength: 1000, + ContentTypes: s.gzip.mimetypes, + })) + if s.middleware.session != nil { + memfs.Use(s.middleware.session) + } + + memfs.HEAD("", s.handler.s3fs.GetFile) + memfs.GET("", s.handler.s3fs.GetFile) + + var authmw echo.MiddlewareFunc + + if s.memfs.enableAuth { + authmw = middleware.BasicAuth(func(username, password string, c echo.Context) (bool, error) { + if username == s.memfs.username && password == s.memfs.password { + return true, nil + } + + return false, nil + }) + + memfs.POST("", s.handler.s3fs.PutFile, authmw) + memfs.PUT("", s.handler.s3fs.PutFile, authmw) + memfs.DELETE("", s.handler.s3fs.DeleteFile, authmw) + } else { + memfs.POST("", s.handler.s3fs.PutFile) + memfs.PUT("", s.handler.s3fs.PutFile) + memfs.DELETE("", s.handler.s3fs.DeleteFile) + } + } + // Prometheus metrics if s.handler.prometheus != nil { metrics := s.router.Group("/metrics") @@ -597,6 +651,18 @@ func (s *server) setRoutesV3(v3 *echo.Group) { } } + // v3 S3 FS + if s.v3handler.s3fs != nil { + v3.GET("/fs/s3", s.v3handler.s3fs.ListFiles) + v3.GET("/fs/s3/*", s.v3handler.s3fs.GetFile) + + if !s.readOnly { + v3.DELETE("/fs/s3/*", s.v3handler.s3fs.DeleteFile) + v3.PUT("/fs/s3/*", s.v3handler.s3fs.PutFile) + v3.PATCH("/fs/s3/*", s.v3handler.s3fs.PatchFile) + } + } + // v3 Disk FS v3.GET("/fs/disk", s.v3handler.diskfs.ListFiles) v3.GET("/fs/disk/*", s.v3handler.diskfs.GetFile, mwmime.NewWithConfig(mwmime.Config{ diff --git a/io/fs/fs.go b/io/fs/fs.go index d1923c47..fd45c4a2 100644 --- a/io/fs/fs.go +++ b/io/fs/fs.go @@ -67,7 +67,7 @@ type Filesystem interface { Store(path string, r io.Reader) (int64, bool, error) // Delete removes a file at the given path from the filesystem. Returns the size of - // the remove file in bytes. The size is negative if the file doesn't exist. + // the removed file in bytes. The size is negative if the file doesn't exist. Delete(path string) int64 // DeleteAll removes all files from the filesystem. Returns the size of the diff --git a/io/fs/s3.go b/io/fs/s3.go index 53c16548..a30eb76c 100644 --- a/io/fs/s3.go +++ b/io/fs/s3.go @@ -6,11 +6,13 @@ import ( "io" "time" + "github.com/datarhei/core/v16/glob" + "github.com/datarhei/core/v16/log" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" ) -type S3FSConfig struct { +type S3Config struct { Base string Endpoint string AccessKeyID string @@ -18,6 +20,8 @@ type S3FSConfig struct { Region string Bucket string UseSSL bool + + Logger log.Logger } type s3fs struct { @@ -31,9 +35,11 @@ type s3fs struct { useSSL bool client *minio.Client + + logger log.Logger } -func NewS3FS(config S3FSConfig) (Filesystem, error) { +func NewS3Filesystem(config S3Config) (Filesystem, error) { fs := &s3fs{ base: config.Base, endpoint: config.Endpoint, @@ -42,6 +48,11 @@ func NewS3FS(config S3FSConfig) (Filesystem, error) { region: config.Region, bucket: config.Bucket, useSSL: config.UseSSL, + logger: config.Logger, + } + + if fs.logger == nil { + fs.logger = log.New("") } client, err := minio.New(fs.endpoint, &minio.Options{ @@ -54,6 +65,29 @@ func NewS3FS(config S3FSConfig) (Filesystem, error) { return nil, err } + fs.logger = fs.logger.WithFields(log.Fields{ + "bucket": fs.bucket, + "region": fs.region, + "endpoint": fs.endpoint, + }) + + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(30*time.Second)) + defer cancel() + + err = client.MakeBucket(ctx, fs.bucket, minio.MakeBucketOptions{Region: fs.region}) + if err != nil { + exists, errBucketExists := client.BucketExists(ctx, fs.bucket) + if errBucketExists != nil { + return nil, err + } + + if exists { + fs.logger.Debug().Log("Bucket already exists") + } + } else { + fs.logger.Debug().Log("Bucket created") + } + fs.client = client return fs, nil @@ -70,7 +104,15 @@ func (fs *s3fs) Rebase(base string) error { } func (fs *s3fs) Size() (int64, int64) { - return -1, -1 + size := int64(0) + + files := fs.List("") + + for _, file := range files { + size += file.Size() + } + + return size, -1 } func (fs *s3fs) Resize(size int64) {} @@ -80,18 +122,15 @@ func (fs *s3fs) Files() int64 { defer cancel() ch := fs.client.ListObjects(ctx, fs.bucket, minio.ListObjectsOptions{ - WithVersions: false, - WithMetadata: false, - Prefix: "", - Recursive: true, - MaxKeys: 0, - StartAfter: "", - UseV1: false, + Recursive: true, }) nfiles := int64(0) - for range ch { + for object := range ch { + if object.Err != nil { + fs.logger.WithError(object.Err).Log("Listing object failed") + } nfiles++ } @@ -107,13 +146,14 @@ func (fs *s3fs) Open(path string) File { defer cancel() object, err := fs.client.GetObject(ctx, fs.bucket, path, minio.GetObjectOptions{}) - if err != nil { + fs.logger.Debug().WithField("key", path).Log("Not found") return nil } stat, err := object.Stat() if err != nil { + fs.logger.Debug().WithField("key", path).Log("Stat failed") return nil } @@ -124,6 +164,8 @@ func (fs *s3fs) Open(path string) File { lastModified: stat.LastModified, } + fs.logger.Debug().WithField("key", stat.Key).Log("Opened") + return file } @@ -161,9 +203,15 @@ func (fs *s3fs) Store(path string, r io.Reader) (int64, bool, error) { Internal: minio.AdvancedPutOptions{}, }) if err != nil { + fs.logger.WithError(err).WithField("key", path).Log("Failed to store file") return -1, false, err } + fs.logger.Debug().WithFields(log.Fields{ + "key": path, + "overwrite": overwrite, + }).Log("Stored") + return info.Size, overwrite, nil } @@ -173,6 +221,7 @@ func (fs *s3fs) Delete(path string) int64 { stat, err := fs.client.StatObject(ctx, fs.bucket, path, minio.StatObjectOptions{}) if err != nil { + fs.logger.Debug().WithField("key", path).Log("Not found") return -1 } @@ -180,9 +229,12 @@ func (fs *s3fs) Delete(path string) int64 { GovernanceBypass: true, }) if err != nil { + fs.logger.WithError(err).WithField("key", stat.Key).Log("Failed to delete file") return -1 } + fs.logger.Debug().WithField("key", stat.Key).Log("Deleted") + return stat.Size } @@ -202,7 +254,7 @@ func (fs *s3fs) DeleteAll() int64 { Recursive: true, }) { if object.Err != nil { - //log.Fatalln(object.Err) + fs.logger.WithError(object.Err).Log("Listing object failed") continue } totalSize += object.Size @@ -213,9 +265,11 @@ func (fs *s3fs) DeleteAll() int64 { for err := range fs.client.RemoveObjects(context.Background(), fs.bucket, objectsCh, minio.RemoveObjectsOptions{ GovernanceBypass: true, }) { - fmt.Println("Error detected during deletion: ", err) + fs.logger.WithError(err.Err).WithField("key", err.ObjectName).Log("Deleting object failed") } + fs.logger.Debug().Log("Deleted all files") + return totalSize } @@ -237,7 +291,14 @@ func (fs *s3fs) List(pattern string) []FileInfo { for object := range ch { if object.Err != nil { - return nil + fs.logger.WithError(object.Err).Log("Listing object failed") + continue + } + + if len(pattern) != 0 { + if ok, _ := glob.Match(pattern, object.Key, '/'); !ok { + continue + } } f := &s3FileInfo{