diff --git a/io/fs/mem.go b/io/fs/mem.go index 7edfa5e6..97b45318 100644 --- a/io/fs/mem.go +++ b/io/fs/mem.go @@ -105,17 +105,60 @@ func (f *memFile) Seek(offset int64, whence int) (int64, error) { } func (f *memFile) Close() error { - if f.data == nil { + if f.r == nil { return io.EOF } f.r = nil - f.data.Reset() - f.data = nil return nil } +func (f *memFile) free() { + f.Close() + + if f.data == nil { + return + } + + pool.Put(f.data) + + f.data = nil +} + +type fileDataPool struct { + pool sync.Pool +} + +var pool *fileDataPool = nil + +func NewFileDataPool() *fileDataPool { + p := &fileDataPool{ + pool: sync.Pool{ + New: func() any { + return &bytes.Buffer{} + }, + }, + } + + return p +} + +func (p *fileDataPool) Get() *bytes.Buffer { + buf := p.pool.Get().(*bytes.Buffer) + buf.Reset() + + return buf +} + +func (p *fileDataPool) Put(buf *bytes.Buffer) { + p.pool.Put(buf) +} + +func init() { + pool = NewFileDataPool() +} + type memFilesystem struct { metadata map[string]string metaLock sync.RWMutex @@ -315,7 +358,7 @@ func (fs *memFilesystem) Files() int64 { func (fs *memFilesystem) Open(path string) File { path = fs.cleanPath(path) - file, ok := fs.storage.LoadAndCopy(path) + file, ok := fs.storage.Load(path) if !ok { return nil } @@ -323,24 +366,26 @@ func (fs *memFilesystem) Open(path string) File { newFile := &memFile{ memFileInfo: memFileInfo{ name: file.name, + size: file.size, + dir: file.dir, lastMod: file.lastMod, linkTo: file.linkTo, }, + data: file.data, } if len(file.linkTo) != 0 { - file.Close() - - file, ok = fs.storage.LoadAndCopy(file.linkTo) + file, ok := fs.storage.Load(file.linkTo) if !ok { return nil } + + newFile.lastMod = file.lastMod + newFile.data = file.data + newFile.size = file.size } - newFile.lastMod = file.lastMod - newFile.data = file.data - newFile.size = file.size - newFile.r = bytes.NewReader(file.data.Bytes()) + newFile.r = bytes.NewReader(newFile.data.Bytes()) return newFile } @@ -348,22 +393,22 @@ func (fs *memFilesystem) Open(path string) File { func (fs *memFilesystem) ReadFile(path string) ([]byte, error) { path = fs.cleanPath(path) - file, ok := fs.storage.LoadAndCopy(path) + file, ok := fs.storage.Load(path) if !ok { return nil, ErrNotExist } if len(file.linkTo) != 0 { - file.Close() - - file, ok = fs.storage.LoadAndCopy(file.linkTo) + file, ok = fs.storage.Load(file.linkTo) if !ok { return nil, ErrNotExist } } - defer file.Close() - return file.data.Bytes(), nil + data := make([]byte, file.data.Len()) + copy(data, file.data.Bytes()) + + return data, nil } func (fs *memFilesystem) Symlink(oldname, newname string) error { @@ -403,7 +448,7 @@ func (fs *memFilesystem) Symlink(oldname, newname string) error { defer fs.sizeLock.Unlock() if replaced { - oldFile.Close() + oldFile.free() fs.currentSize -= oldFile.size } @@ -456,7 +501,7 @@ func (fs *memFilesystem) WriteFileReader(path string, r io.Reader, sizeHint int) size: 0, lastMod: time.Now(), }, - data: &bytes.Buffer{}, + data: pool.Get(), } if sizeHint > 0 && sizeHint < 5*1024*1024 { @@ -471,7 +516,7 @@ func (fs *memFilesystem) WriteFileReader(path string, r io.Reader, sizeHint int) "error": err, }).Warn().Log("Incomplete file") - newFile.Close() + newFile.free() return -1, false, fmt.Errorf("incomplete file") } @@ -488,7 +533,7 @@ func (fs *memFilesystem) WriteFileReader(path string, r io.Reader, sizeHint int) defer fs.sizeLock.Unlock() if replace { - oldFile.Close() + oldFile.free() fs.currentSize -= oldFile.size } @@ -521,13 +566,26 @@ func (fs *memFilesystem) WriteFileSafe(path string, data []byte) (int64, bool, e func (fs *memFilesystem) AppendFileReader(path string, r io.Reader, sizeHint int) (int64, error) { path = fs.cleanPath(path) - file, hasFile := fs.storage.LoadAndCopy(path) + file, hasFile := fs.storage.Load(path) if !hasFile { size, _, err := fs.WriteFileReader(path, r, sizeHint) return size, err } - size, err := copyToBufferFromReader(file.data, r, 8*1024) + newFile := &memFile{ + memFileInfo: memFileInfo{ + name: path, + dir: false, + size: 0, + lastMod: time.Now(), + }, + data: pool.Get(), + } + + newFile.data.Grow(file.data.Len()) + newFile.data.Write(file.data.Bytes()) + + size, err := copyToBufferFromReader(newFile.data, r, 8*1024) if err != nil { fs.logger.WithFields(log.Fields{ "path": path, @@ -535,18 +593,22 @@ func (fs *memFilesystem) AppendFileReader(path string, r io.Reader, sizeHint int "error": err, }).Warn().Log("Incomplete file") - file.Close() + newFile.free() return -1, fmt.Errorf("incomplete file") } file.size += size - fs.storage.Store(path, file) + oldFile, replace := fs.storage.Store(path, newFile) fs.sizeLock.Lock() defer fs.sizeLock.Unlock() + if replace { + oldFile.free() + } + fs.currentSize += size fs.logger.Debug().WithFields(log.Fields{ @@ -583,7 +645,7 @@ func (fs *memFilesystem) Purge(size int64) int64 { fs.currentSize -= f.size fs.sizeLock.Unlock() - f.Close() + f.free() fs.logger.WithFields(log.Fields{ "path": f.name, @@ -643,7 +705,7 @@ func (fs *memFilesystem) Rename(src, dst string) error { defer fs.sizeLock.Unlock() if replace { - dstFile.Close() + dstFile.free() fs.currentSize -= dstFile.size } @@ -663,13 +725,12 @@ func (fs *memFilesystem) Copy(src, dst string) error { return os.ErrInvalid } - srcFile, ok := fs.storage.LoadAndCopy(src) + srcFile, ok := fs.storage.Load(src) if !ok { return ErrNotExist } if srcFile.dir { - srcFile.Close() return ErrNotExist } @@ -680,9 +741,12 @@ func (fs *memFilesystem) Copy(src, dst string) error { size: srcFile.size, lastMod: time.Now(), }, - data: srcFile.data, + data: pool.Get(), } + dstFile.data.Grow(srcFile.data.Len()) + dstFile.data.Write(srcFile.data.Bytes()) + f, replace := fs.storage.Store(dst, dstFile) if !replace { @@ -693,7 +757,7 @@ func (fs *memFilesystem) Copy(src, dst string) error { defer fs.sizeLock.Unlock() if replace { - f.Close() + f.free() fs.currentSize -= f.size } @@ -761,7 +825,7 @@ func (fs *memFilesystem) Remove(path string) int64 { func (fs *memFilesystem) remove(path string) int64 { file, ok := fs.storage.Delete(path) if ok { - file.Close() + file.free() fs.dirs.Remove(path) @@ -851,7 +915,7 @@ func (fs *memFilesystem) RemoveList(path string, options ListOptions) ([]string, fs.dirs.Remove(file.name) - file.Close() + file.free() } fs.sizeLock.Lock() diff --git a/io/fs/mem_storage.go b/io/fs/mem_storage.go index 044b375e..8267660a 100644 --- a/io/fs/mem_storage.go +++ b/io/fs/mem_storage.go @@ -21,10 +21,6 @@ type memStorage interface { // i.e. all changes to the file will be reflected on the storage. Load(key string) (value *memFile, ok bool) - // LoadAndCopy loads a file from the storage. The returned file is a copy - // and can be modified without modifying the file on the storage. - LoadAndCopy(key string) (value *memFile, ok bool) - // Has checks whether a file exists at path. Has(key string) bool @@ -68,33 +64,6 @@ func (m *mapOfStorage) Load(key string) (*memFile, bool) { return m.files.Load(key) } -func (m *mapOfStorage) LoadAndCopy(key string) (*memFile, bool) { - token := m.lock.RLock() - defer m.lock.RUnlock(token) - - v, ok := m.files.Load(key) - if !ok { - return nil, false - } - - f := &memFile{ - memFileInfo: memFileInfo{ - name: v.name, - size: v.size, - dir: v.dir, - lastMod: v.lastMod, - linkTo: v.linkTo, - }, - r: nil, - } - - if v.data != nil { - f.data = bytes.NewBuffer(v.data.Bytes()) - } - - return f, true -} - func (m *mapOfStorage) Has(key string) bool { token := m.lock.RLock() defer m.lock.RUnlock(token) diff --git a/io/fs/mem_test.go b/io/fs/mem_test.go index 1e9f43fb..63a8f0f3 100644 --- a/io/fs/mem_test.go +++ b/io/fs/mem_test.go @@ -48,6 +48,34 @@ func TestWriteWhileRead(t *testing.T) { require.Equal(t, []byte("xxxxx"), data) } +func TestCopy(t *testing.T) { + fs, err := NewMemFilesystem(MemConfig{}) + require.NoError(t, err) + + _, _, err = fs.WriteFile("/foobar", []byte("xxxxx")) + require.NoError(t, err) + + data, err := fs.ReadFile("/foobar") + require.NoError(t, err) + + require.Equal(t, []byte("xxxxx"), data) + + err = fs.Copy("/foobar", "/barfoo") + require.NoError(t, err) + + data, err = fs.ReadFile("/barfoo") + require.NoError(t, err) + + require.Equal(t, []byte("xxxxx"), data) + + fs.Remove("/foobar") + + data, err = fs.ReadFile("/barfoo") + require.NoError(t, err) + + require.Equal(t, []byte("xxxxx"), data) +} + func BenchmarkMemStorages(b *testing.B) { storages := []string{ "map", diff --git a/io/fs/memtest/.gitignore b/io/fs/memtest/.gitignore new file mode 100644 index 00000000..f43fea82 --- /dev/null +++ b/io/fs/memtest/.gitignore @@ -0,0 +1 @@ +memtest diff --git a/io/fs/memtest/memtest.go b/io/fs/memtest/memtest.go new file mode 100644 index 00000000..2ef879bb --- /dev/null +++ b/io/fs/memtest/memtest.go @@ -0,0 +1,220 @@ +package main + +import ( + "bytes" + "context" + "flag" + "fmt" + "io" + "log" + gorand "math/rand/v2" + "os" + "os/signal" + "runtime" + "runtime/debug" + "strconv" + "sync" + "time" + + "github.com/datarhei/core/v16/io/fs" + "github.com/datarhei/core/v16/math/rand" + + "github.com/google/gops/agent" +) + +func main() { + oStorage := "mapof" + oWriters := 500 + oReaders := 1000 + oFiles := 15 + oInterval := 1 // seconds + oSize := 2 // megabytes + oLimit := false + + flag.StringVar(&oStorage, "storage", "mapof", "type of mem storage implementation (mapof, map, swiss)") + flag.IntVar(&oWriters, "writers", 500, "number of concurrent writers") + flag.IntVar(&oReaders, "readers", 1000, "number of concurrent readers") + flag.IntVar(&oFiles, "files", 15, "number of files to keep per writer") + flag.IntVar(&oInterval, "interval", 1, "interval for writing files in seconds") + flag.IntVar(&oSize, "size", 2048, "size of files to write in kilobytes") + flag.BoolVar(&oLimit, "limit", false, "set memory limit") + + flag.Parse() + + estimatedSize := float64(oWriters*oFiles*oSize) / 1024 / 1024 + + fmt.Printf("Expecting effective memory consumption of %.1fGB\n", estimatedSize) + + if oLimit { + fmt.Printf("Setting memory limit to %.1fGB\n", estimatedSize*1.5) + debug.SetMemoryLimit(int64(estimatedSize * 1.5)) + } + + memfs, err := fs.NewMemFilesystem(fs.MemConfig{ + Storage: oStorage, + }) + + if err != nil { + log.Fatalf("acquiring new memfs: %s", err.Error()) + } + + err = agent.Listen(agent.Options{ + Addr: ":9000", + ReuseSocketAddrAndPort: true, + }) + + if err != nil { + log.Fatalf("starting agent: %s", err.Error()) + } + + fmt.Printf("Started agent on :9000\n") + + ctx, cancel := context.WithCancel(context.Background()) + + wgWriter := sync.WaitGroup{} + + for i := 0; i < oWriters; i++ { + fmt.Printf("%4d / %4d writer started\r", i+1, oWriters) + + wgWriter.Add(1) + + go func(ctx context.Context, memfs fs.Filesystem, index int, nfiles int64, interval time.Duration) { + defer wgWriter.Done() + + jitter := gorand.IntN(200) + interval += time.Duration(jitter) * time.Millisecond + + sequence := int64(0) + + buf := bytes.NewBufferString(rand.StringAlphanumeric(oSize * (1024 + jitter - 100))) + r := bytes.NewReader(buf.Bytes()) + + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + path := fmt.Sprintf("/foobar/test_%d_%06d.dat", index, sequence) + + // Write file to memfs + r.Seek(0, io.SeekStart) + memfs.WriteFileReader(path, r, -1) + + // Delete file from memfs + if sequence-nfiles >= 0 { + path = fmt.Sprintf("/foobar/test_%d_%06d.dat", index, sequence-nfiles) + memfs.Remove(path) + } + + path = fmt.Sprintf("/foobar/test_%d.last", index) + memfs.WriteFile(path, []byte(strconv.FormatInt(sequence, 10))) + + sequence++ + } + } + }(ctx, memfs, i, int64(oFiles), time.Duration(oInterval)*time.Second) + } + + fmt.Printf("\n") + + wgReader := sync.WaitGroup{} + + if oReaders > 0 { + for i := 0; i < oReaders; i++ { + fmt.Printf("%4d / %4d reader started\r", i+1, oReaders) + + wgReader.Add(1) + + go func(ctx context.Context, memfs fs.Filesystem, interval time.Duration) { + defer wgReader.Done() + + buf := bytes.Buffer{} + + jitter := gorand.IntN(200) + interval += time.Duration(jitter) * time.Millisecond + + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + index := gorand.IntN(oWriters) + + path := fmt.Sprintf("/foobar/test_%d.list", index) + data, err := memfs.ReadFile(path) + if err != nil { + continue + } + + sequence, err := strconv.ParseUint(string(data), 10, 64) + if err != nil { + continue + } + + path = fmt.Sprintf("/foobar/test_%d_%06d.dat", index, sequence) + file := memfs.Open(path) + + buf.ReadFrom(file) + buf.Reset() + } + } + }(ctx, memfs, time.Duration(oInterval)*time.Second) + } + + fmt.Printf("\n") + } + + go func(ctx context.Context, memfs fs.Filesystem) { + ticker := time.NewTicker(2 * time.Second) + defer ticker.Stop() + + nMallocs := uint64(0) + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + m := runtime.MemStats{} + runtime.ReadMemStats(&m) + + size, _ := memfs.Size() + fmt.Printf("%5.1fGB ", float64(size)/1024/1024/1024) + + listfiles := 0 + listsize := int64(0) + files := memfs.List("/", fs.ListOptions{}) + for _, f := range files { + listsize += f.Size() + } + listfiles = len(files) + + fmt.Printf("(%7d files with %5.1fGB) ", listfiles, float64(listsize)/1024/1024/1024) + + fmt.Printf("alloc=%5.1fGB (%8.1fGB) sys=%5.1fGB idle=%5.1fGB inuse=%5.1fGB mallocs=%d objects=%d\n", float64(m.HeapAlloc)/1024/1024/1024, float64(m.TotalAlloc)/1024/1024/1024, float64(m.HeapSys)/1024/1024/1024, float64(m.HeapIdle)/1024/1024/1024, float64(m.HeapInuse)/1024/1024/1024, m.Mallocs-nMallocs, m.Mallocs-m.Frees) + + nMallocs = m.Mallocs + } + } + }(ctx, memfs) + + quit := make(chan os.Signal, 1) + signal.Notify(quit, os.Interrupt) + <-quit + + cancel() + + fmt.Printf("Waiting for readers to stop ...\n") + wgReader.Wait() + + fmt.Printf("Waiting for writers to stop ...\n") + wgWriter.Wait() + + fmt.Printf("Done\n") +}