mirror of
https://github.com/datarhei/core.git
synced 2025-10-28 02:01:55 +08:00
Fix high memory consumption when copying files to S3
This commit is contained in:
@@ -8,6 +8,7 @@ import (
|
|||||||
"github.com/datarhei/core/v16/http/api"
|
"github.com/datarhei/core/v16/http/api"
|
||||||
"github.com/datarhei/core/v16/http/handler"
|
"github.com/datarhei/core/v16/http/handler"
|
||||||
"github.com/datarhei/core/v16/http/handler/util"
|
"github.com/datarhei/core/v16/http/handler/util"
|
||||||
|
"github.com/datarhei/core/v16/io/fs"
|
||||||
|
|
||||||
"github.com/fujiwara/shapeio"
|
"github.com/fujiwara/shapeio"
|
||||||
"github.com/labstack/echo/v4"
|
"github.com/labstack/echo/v4"
|
||||||
@@ -244,6 +245,11 @@ func (h *FSHandler) FileOperation(c echo.Context) error {
|
|||||||
|
|
||||||
defer fromFile.Close()
|
defer fromFile.Close()
|
||||||
|
|
||||||
|
fromFileStat, err := fromFile.Stat()
|
||||||
|
if err != nil {
|
||||||
|
return api.Err(http.StatusBadRequest, "Source files with unknown size", "%s", fromFSName)
|
||||||
|
}
|
||||||
|
|
||||||
var reader io.Reader = fromFile
|
var reader io.Reader = fromFile
|
||||||
|
|
||||||
if operation.RateLimit != 0 {
|
if operation.RateLimit != 0 {
|
||||||
@@ -254,7 +260,10 @@ func (h *FSHandler) FileOperation(c echo.Context) error {
|
|||||||
reader = shapedReader
|
reader = shapedReader
|
||||||
}
|
}
|
||||||
|
|
||||||
_, _, err := toFS.Handler.FS.Filesystem.WriteFileReader(toPath, reader)
|
// In case the target is S3, allow it to determine the size of the file
|
||||||
|
sizer := fs.NewReadSizer(reader, fromFileStat.Size())
|
||||||
|
|
||||||
|
_, _, err = toFS.Handler.FS.Filesystem.WriteFileReader(toPath, sizer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
toFS.Handler.FS.Filesystem.Remove(toPath)
|
toFS.Handler.FS.Filesystem.Remove(toPath)
|
||||||
return api.Err(http.StatusBadRequest, "Writing target file failed", "%s", err)
|
return api.Err(http.StatusBadRequest, "Writing target file failed", "%s", err)
|
||||||
|
|||||||
@@ -26,5 +26,6 @@ func TestMemFromDir(t *testing.T) {
|
|||||||
"/s3.go",
|
"/s3.go",
|
||||||
"/sized_test.go",
|
"/sized_test.go",
|
||||||
"/sized.go",
|
"/sized.go",
|
||||||
|
"/sizer.go",
|
||||||
}, names)
|
}, names)
|
||||||
}
|
}
|
||||||
|
|||||||
14
io/fs/s3.go
14
io/fs/s3.go
@@ -296,7 +296,13 @@ func (fs *s3Filesystem) write(path string, r io.Reader) (int64, bool, error) {
|
|||||||
overwrite = true
|
overwrite = true
|
||||||
}
|
}
|
||||||
|
|
||||||
info, err := fs.client.PutObject(ctx, fs.bucket, path, r, -1, minio.PutObjectOptions{
|
var size int64 = -1
|
||||||
|
sizer, ok := r.(Sizer)
|
||||||
|
if ok {
|
||||||
|
size = sizer.Size()
|
||||||
|
}
|
||||||
|
|
||||||
|
info, err := fs.client.PutObject(ctx, fs.bucket, path, r, size, minio.PutObjectOptions{
|
||||||
UserMetadata: map[string]string{},
|
UserMetadata: map[string]string{},
|
||||||
UserTags: map[string]string{},
|
UserTags: map[string]string{},
|
||||||
Progress: nil,
|
Progress: nil,
|
||||||
@@ -337,11 +343,13 @@ func (fs *s3Filesystem) WriteFileReader(path string, r io.Reader) (int64, bool,
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (fs *s3Filesystem) WriteFile(path string, data []byte) (int64, bool, error) {
|
func (fs *s3Filesystem) WriteFile(path string, data []byte) (int64, bool, error) {
|
||||||
return fs.WriteFileReader(path, bytes.NewBuffer(data))
|
rs := NewReadSizer(bytes.NewBuffer(data), int64(len(data)))
|
||||||
|
return fs.WriteFileReader(path, rs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fs *s3Filesystem) WriteFileSafe(path string, data []byte) (int64, bool, error) {
|
func (fs *s3Filesystem) WriteFileSafe(path string, data []byte) (int64, bool, error) {
|
||||||
return fs.WriteFileReader(path, bytes.NewBuffer(data))
|
rs := NewReadSizer(bytes.NewBuffer(data), int64(len(data)))
|
||||||
|
return fs.WriteFileReader(path, rs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fs *s3Filesystem) Rename(src, dst string) error {
|
func (fs *s3Filesystem) Rename(src, dst string) error {
|
||||||
|
|||||||
36
io/fs/sizer.go
Normal file
36
io/fs/sizer.go
Normal file
@@ -0,0 +1,36 @@
|
|||||||
|
package fs
|
||||||
|
|
||||||
|
import "io"
|
||||||
|
|
||||||
|
// Sizer interface can decorate a Reader
|
||||||
|
type Sizer interface {
|
||||||
|
// Size returns the size of the object
|
||||||
|
Size() int64
|
||||||
|
}
|
||||||
|
|
||||||
|
type ReadSizer interface {
|
||||||
|
io.Reader
|
||||||
|
Sizer
|
||||||
|
}
|
||||||
|
|
||||||
|
type readSizer struct {
|
||||||
|
r io.Reader
|
||||||
|
size int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewReadSizer(r io.Reader, size int64) ReadSizer {
|
||||||
|
rs := &readSizer{
|
||||||
|
r: r,
|
||||||
|
size: size,
|
||||||
|
}
|
||||||
|
|
||||||
|
return rs
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rs *readSizer) Read(p []byte) (int, error) {
|
||||||
|
return rs.r.Read(p)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rs *readSizer) Size() int64 {
|
||||||
|
return rs.size
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user