Integrate S3 filesystem into http server

This commit is contained in:
Ingo Oppermann
2022-08-19 20:32:42 +03:00
parent 50e511e6c6
commit 1183de560a
4 changed files with 200 additions and 32 deletions

View File

@@ -60,22 +60,24 @@ type API interface {
}
type api struct {
restream restream.Restreamer
ffmpeg ffmpeg.FFmpeg
diskfs fs.Filesystem
memfs fs.Filesystem
rtmpserver rtmp.Server
srtserver srt.Server
metrics monitor.HistoryMonitor
prom prometheus.Metrics
service service.Service
sessions session.Registry
cache cache.Cacher
mainserver *gohttp.Server
sidecarserver *gohttp.Server
httpjwt jwt.JWT
update update.Checker
replacer replace.Replacer
restream restream.Restreamer
ffmpeg ffmpeg.FFmpeg
diskfs fs.Filesystem
memfs fs.Filesystem
s3fs fs.Filesystem
rtmpserver rtmp.Server
srtserver srt.Server
metrics monitor.HistoryMonitor
prom prometheus.Metrics
service service.Service
sessions session.Registry
sessionsLimiter net.IPLimiter
cache cache.Cacher
mainserver *gohttp.Server
sidecarserver *gohttp.Server
httpjwt jwt.JWT
update update.Checker
replacer replace.Replacer
errorChan chan error
@@ -408,6 +410,32 @@ func (a *api) start() error {
a.memfs.Resize(cfg.Storage.Memory.Size * 1024 * 1024)
}
baseS3FS := url.URL{
Scheme: "http",
Path: "/s3",
}
host, port, _ = gonet.SplitHostPort(cfg.Address)
if len(host) == 0 {
baseS3FS.Host = "localhost:" + port
} else {
baseS3FS.Host = cfg.Address
}
if cfg.Storage.Memory.Auth.Enable {
baseS3FS.User = url.UserPassword(cfg.Storage.Memory.Auth.Username, cfg.Storage.Memory.Auth.Password)
}
s3fs, err := fs.NewS3Filesystem(fs.S3Config{
Base: baseS3FS.String(),
Logger: a.log.logger.core.WithComponent("S3FS"),
})
if err != nil {
return err
}
a.s3fs = s3fs
var portrange net.Portranger
if cfg.Playout.Enable {
@@ -552,6 +580,7 @@ func (a *api) start() error {
metrics.Register(monitor.NewDiskCollector(a.diskfs.Base()))
metrics.Register(monitor.NewFilesystemCollector("diskfs", diskfs))
metrics.Register(monitor.NewFilesystemCollector("memfs", a.memfs))
metrics.Register(monitor.NewFilesystemCollector("s3fs", a.s3fs))
metrics.Register(monitor.NewRestreamCollector(a.restream))
metrics.Register(monitor.NewFFmpegCollector(a.ffmpeg))
metrics.Register(monitor.NewSessionCollector(a.sessions, []string{}))
@@ -804,6 +833,12 @@ func (a *api) start() error {
Password: cfg.Storage.Memory.Auth.Password,
Filesystem: a.memfs,
},
S3FS: http.MemFSConfig{
EnableAuth: cfg.Storage.Memory.Auth.Enable,
Username: cfg.Storage.Memory.Auth.Username,
Password: cfg.Storage.Memory.Auth.Password,
Filesystem: a.s3fs,
},
IPLimiter: iplimiter,
Profiling: cfg.Debug.Profiling,
Cors: http.CorsConfig{
@@ -866,6 +901,12 @@ func (a *api) start() error {
Password: cfg.Storage.Memory.Auth.Password,
Filesystem: a.memfs,
},
S3FS: http.MemFSConfig{
EnableAuth: cfg.Storage.Memory.Auth.Enable,
Username: cfg.Storage.Memory.Auth.Username,
Password: cfg.Storage.Memory.Auth.Password,
Filesystem: a.s3fs,
},
IPLimiter: iplimiter,
Profiling: cfg.Debug.Profiling,
Cors: http.CorsConfig{

View File

@@ -81,6 +81,7 @@ type Config struct {
MimeTypesFile string
DiskFS fs.Filesystem
MemFS MemFSConfig
S3FS MemFSConfig
IPLimiter net.IPLimiter
Profiling bool
Cors CorsConfig
@@ -115,6 +116,7 @@ type server struct {
handler struct {
about *api.AboutHandler
memfs *handler.MemFSHandler
s3fs *handler.MemFSHandler
diskfs *handler.DiskFSHandler
prometheus *handler.PrometheusHandler
profiling *handler.ProfilingHandler
@@ -128,6 +130,7 @@ type server struct {
restream *api.RestreamHandler
playout *api.PlayoutHandler
memfs *api.MemFSHandler
s3fs *api.MemFSHandler
diskfs *api.DiskFSHandler
rtmp *api.RTMPHandler
srt *api.SRTHandler
@@ -229,6 +232,16 @@ func NewServer(config Config) (Server, error) {
)
}
if config.S3FS.Filesystem != nil {
s.v3handler.s3fs = api.NewMemFS(
config.S3FS.Filesystem,
)
s.handler.s3fs = handler.NewMemFS(
config.S3FS.Filesystem,
)
}
if config.Prometheus != nil {
s.handler.prometheus = handler.NewPrometheus(
config.Prometheus.HTTPHandler(),
@@ -307,6 +320,7 @@ func NewServer(config Config) (Server, error) {
"/": config.Cors.Origins,
"/api": {"*"},
"/memfs": config.Cors.Origins,
"/s3": config.Cors.Origins,
},
}); err != nil {
return nil, err
@@ -491,6 +505,46 @@ func (s *server) setRoutes() {
}
}
// S3 FS
if s.handler.s3fs != nil {
memfs := s.router.Group("/s3fs/*")
memfs.Use(mwmime.NewWithConfig(mwmime.Config{
MimeTypesFile: s.mimeTypesFile,
DefaultContentType: "application/data",
}))
memfs.Use(mwgzip.NewWithConfig(mwgzip.Config{
Level: mwgzip.BestSpeed,
MinLength: 1000,
ContentTypes: s.gzip.mimetypes,
}))
if s.middleware.session != nil {
memfs.Use(s.middleware.session)
}
memfs.HEAD("", s.handler.s3fs.GetFile)
memfs.GET("", s.handler.s3fs.GetFile)
var authmw echo.MiddlewareFunc
if s.memfs.enableAuth {
authmw = middleware.BasicAuth(func(username, password string, c echo.Context) (bool, error) {
if username == s.memfs.username && password == s.memfs.password {
return true, nil
}
return false, nil
})
memfs.POST("", s.handler.s3fs.PutFile, authmw)
memfs.PUT("", s.handler.s3fs.PutFile, authmw)
memfs.DELETE("", s.handler.s3fs.DeleteFile, authmw)
} else {
memfs.POST("", s.handler.s3fs.PutFile)
memfs.PUT("", s.handler.s3fs.PutFile)
memfs.DELETE("", s.handler.s3fs.DeleteFile)
}
}
// Prometheus metrics
if s.handler.prometheus != nil {
metrics := s.router.Group("/metrics")
@@ -597,6 +651,18 @@ func (s *server) setRoutesV3(v3 *echo.Group) {
}
}
// v3 S3 FS
if s.v3handler.s3fs != nil {
v3.GET("/fs/s3", s.v3handler.s3fs.ListFiles)
v3.GET("/fs/s3/*", s.v3handler.s3fs.GetFile)
if !s.readOnly {
v3.DELETE("/fs/s3/*", s.v3handler.s3fs.DeleteFile)
v3.PUT("/fs/s3/*", s.v3handler.s3fs.PutFile)
v3.PATCH("/fs/s3/*", s.v3handler.s3fs.PatchFile)
}
}
// v3 Disk FS
v3.GET("/fs/disk", s.v3handler.diskfs.ListFiles)
v3.GET("/fs/disk/*", s.v3handler.diskfs.GetFile, mwmime.NewWithConfig(mwmime.Config{

View File

@@ -67,7 +67,7 @@ type Filesystem interface {
Store(path string, r io.Reader) (int64, bool, error)
// Delete removes a file at the given path from the filesystem. Returns the size of
// the remove file in bytes. The size is negative if the file doesn't exist.
// the removed file in bytes. The size is negative if the file doesn't exist.
Delete(path string) int64
// DeleteAll removes all files from the filesystem. Returns the size of the

View File

@@ -6,11 +6,13 @@ import (
"io"
"time"
"github.com/datarhei/core/v16/glob"
"github.com/datarhei/core/v16/log"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
)
type S3FSConfig struct {
type S3Config struct {
Base string
Endpoint string
AccessKeyID string
@@ -18,6 +20,8 @@ type S3FSConfig struct {
Region string
Bucket string
UseSSL bool
Logger log.Logger
}
type s3fs struct {
@@ -31,9 +35,11 @@ type s3fs struct {
useSSL bool
client *minio.Client
logger log.Logger
}
func NewS3FS(config S3FSConfig) (Filesystem, error) {
func NewS3Filesystem(config S3Config) (Filesystem, error) {
fs := &s3fs{
base: config.Base,
endpoint: config.Endpoint,
@@ -42,6 +48,11 @@ func NewS3FS(config S3FSConfig) (Filesystem, error) {
region: config.Region,
bucket: config.Bucket,
useSSL: config.UseSSL,
logger: config.Logger,
}
if fs.logger == nil {
fs.logger = log.New("")
}
client, err := minio.New(fs.endpoint, &minio.Options{
@@ -54,6 +65,29 @@ func NewS3FS(config S3FSConfig) (Filesystem, error) {
return nil, err
}
fs.logger = fs.logger.WithFields(log.Fields{
"bucket": fs.bucket,
"region": fs.region,
"endpoint": fs.endpoint,
})
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(30*time.Second))
defer cancel()
err = client.MakeBucket(ctx, fs.bucket, minio.MakeBucketOptions{Region: fs.region})
if err != nil {
exists, errBucketExists := client.BucketExists(ctx, fs.bucket)
if errBucketExists != nil {
return nil, err
}
if exists {
fs.logger.Debug().Log("Bucket already exists")
}
} else {
fs.logger.Debug().Log("Bucket created")
}
fs.client = client
return fs, nil
@@ -70,7 +104,15 @@ func (fs *s3fs) Rebase(base string) error {
}
func (fs *s3fs) Size() (int64, int64) {
return -1, -1
size := int64(0)
files := fs.List("")
for _, file := range files {
size += file.Size()
}
return size, -1
}
func (fs *s3fs) Resize(size int64) {}
@@ -80,18 +122,15 @@ func (fs *s3fs) Files() int64 {
defer cancel()
ch := fs.client.ListObjects(ctx, fs.bucket, minio.ListObjectsOptions{
WithVersions: false,
WithMetadata: false,
Prefix: "",
Recursive: true,
MaxKeys: 0,
StartAfter: "",
UseV1: false,
Recursive: true,
})
nfiles := int64(0)
for range ch {
for object := range ch {
if object.Err != nil {
fs.logger.WithError(object.Err).Log("Listing object failed")
}
nfiles++
}
@@ -107,13 +146,14 @@ func (fs *s3fs) Open(path string) File {
defer cancel()
object, err := fs.client.GetObject(ctx, fs.bucket, path, minio.GetObjectOptions{})
if err != nil {
fs.logger.Debug().WithField("key", path).Log("Not found")
return nil
}
stat, err := object.Stat()
if err != nil {
fs.logger.Debug().WithField("key", path).Log("Stat failed")
return nil
}
@@ -124,6 +164,8 @@ func (fs *s3fs) Open(path string) File {
lastModified: stat.LastModified,
}
fs.logger.Debug().WithField("key", stat.Key).Log("Opened")
return file
}
@@ -161,9 +203,15 @@ func (fs *s3fs) Store(path string, r io.Reader) (int64, bool, error) {
Internal: minio.AdvancedPutOptions{},
})
if err != nil {
fs.logger.WithError(err).WithField("key", path).Log("Failed to store file")
return -1, false, err
}
fs.logger.Debug().WithFields(log.Fields{
"key": path,
"overwrite": overwrite,
}).Log("Stored")
return info.Size, overwrite, nil
}
@@ -173,6 +221,7 @@ func (fs *s3fs) Delete(path string) int64 {
stat, err := fs.client.StatObject(ctx, fs.bucket, path, minio.StatObjectOptions{})
if err != nil {
fs.logger.Debug().WithField("key", path).Log("Not found")
return -1
}
@@ -180,9 +229,12 @@ func (fs *s3fs) Delete(path string) int64 {
GovernanceBypass: true,
})
if err != nil {
fs.logger.WithError(err).WithField("key", stat.Key).Log("Failed to delete file")
return -1
}
fs.logger.Debug().WithField("key", stat.Key).Log("Deleted")
return stat.Size
}
@@ -202,7 +254,7 @@ func (fs *s3fs) DeleteAll() int64 {
Recursive: true,
}) {
if object.Err != nil {
//log.Fatalln(object.Err)
fs.logger.WithError(object.Err).Log("Listing object failed")
continue
}
totalSize += object.Size
@@ -213,9 +265,11 @@ func (fs *s3fs) DeleteAll() int64 {
for err := range fs.client.RemoveObjects(context.Background(), fs.bucket, objectsCh, minio.RemoveObjectsOptions{
GovernanceBypass: true,
}) {
fmt.Println("Error detected during deletion: ", err)
fs.logger.WithError(err.Err).WithField("key", err.ObjectName).Log("Deleting object failed")
}
fs.logger.Debug().Log("Deleted all files")
return totalSize
}
@@ -237,7 +291,14 @@ func (fs *s3fs) List(pattern string) []FileInfo {
for object := range ch {
if object.Err != nil {
return nil
fs.logger.WithError(object.Err).Log("Listing object failed")
continue
}
if len(pattern) != 0 {
if ok, _ := glob.Match(pattern, object.Key, '/'); !ok {
continue
}
}
f := &s3FileInfo{