Add buffer pool for memfs

This commit is contained in:
Ingo Oppermann
2024-10-08 14:27:23 +02:00
parent 30af9e9c36
commit fe2cbd4f60
5 changed files with 346 additions and 64 deletions

View File

@@ -105,17 +105,60 @@ func (f *memFile) Seek(offset int64, whence int) (int64, error) {
}
func (f *memFile) Close() error {
if f.data == nil {
if f.r == nil {
return io.EOF
}
f.r = nil
f.data.Reset()
f.data = nil
return nil
}
func (f *memFile) free() {
f.Close()
if f.data == nil {
return
}
pool.Put(f.data)
f.data = nil
}
type fileDataPool struct {
pool sync.Pool
}
var pool *fileDataPool = nil
func NewFileDataPool() *fileDataPool {
p := &fileDataPool{
pool: sync.Pool{
New: func() any {
return &bytes.Buffer{}
},
},
}
return p
}
func (p *fileDataPool) Get() *bytes.Buffer {
buf := p.pool.Get().(*bytes.Buffer)
buf.Reset()
return buf
}
func (p *fileDataPool) Put(buf *bytes.Buffer) {
p.pool.Put(buf)
}
func init() {
pool = NewFileDataPool()
}
type memFilesystem struct {
metadata map[string]string
metaLock sync.RWMutex
@@ -315,7 +358,7 @@ func (fs *memFilesystem) Files() int64 {
func (fs *memFilesystem) Open(path string) File {
path = fs.cleanPath(path)
file, ok := fs.storage.LoadAndCopy(path)
file, ok := fs.storage.Load(path)
if !ok {
return nil
}
@@ -323,24 +366,26 @@ func (fs *memFilesystem) Open(path string) File {
newFile := &memFile{
memFileInfo: memFileInfo{
name: file.name,
size: file.size,
dir: file.dir,
lastMod: file.lastMod,
linkTo: file.linkTo,
},
data: file.data,
}
if len(file.linkTo) != 0 {
file.Close()
file, ok = fs.storage.LoadAndCopy(file.linkTo)
file, ok := fs.storage.Load(file.linkTo)
if !ok {
return nil
}
newFile.lastMod = file.lastMod
newFile.data = file.data
newFile.size = file.size
}
newFile.lastMod = file.lastMod
newFile.data = file.data
newFile.size = file.size
newFile.r = bytes.NewReader(file.data.Bytes())
newFile.r = bytes.NewReader(newFile.data.Bytes())
return newFile
}
@@ -348,22 +393,22 @@ func (fs *memFilesystem) Open(path string) File {
func (fs *memFilesystem) ReadFile(path string) ([]byte, error) {
path = fs.cleanPath(path)
file, ok := fs.storage.LoadAndCopy(path)
file, ok := fs.storage.Load(path)
if !ok {
return nil, ErrNotExist
}
if len(file.linkTo) != 0 {
file.Close()
file, ok = fs.storage.LoadAndCopy(file.linkTo)
file, ok = fs.storage.Load(file.linkTo)
if !ok {
return nil, ErrNotExist
}
}
defer file.Close()
return file.data.Bytes(), nil
data := make([]byte, file.data.Len())
copy(data, file.data.Bytes())
return data, nil
}
func (fs *memFilesystem) Symlink(oldname, newname string) error {
@@ -403,7 +448,7 @@ func (fs *memFilesystem) Symlink(oldname, newname string) error {
defer fs.sizeLock.Unlock()
if replaced {
oldFile.Close()
oldFile.free()
fs.currentSize -= oldFile.size
}
@@ -456,7 +501,7 @@ func (fs *memFilesystem) WriteFileReader(path string, r io.Reader, sizeHint int)
size: 0,
lastMod: time.Now(),
},
data: &bytes.Buffer{},
data: pool.Get(),
}
if sizeHint > 0 && sizeHint < 5*1024*1024 {
@@ -471,7 +516,7 @@ func (fs *memFilesystem) WriteFileReader(path string, r io.Reader, sizeHint int)
"error": err,
}).Warn().Log("Incomplete file")
newFile.Close()
newFile.free()
return -1, false, fmt.Errorf("incomplete file")
}
@@ -488,7 +533,7 @@ func (fs *memFilesystem) WriteFileReader(path string, r io.Reader, sizeHint int)
defer fs.sizeLock.Unlock()
if replace {
oldFile.Close()
oldFile.free()
fs.currentSize -= oldFile.size
}
@@ -521,13 +566,26 @@ func (fs *memFilesystem) WriteFileSafe(path string, data []byte) (int64, bool, e
func (fs *memFilesystem) AppendFileReader(path string, r io.Reader, sizeHint int) (int64, error) {
path = fs.cleanPath(path)
file, hasFile := fs.storage.LoadAndCopy(path)
file, hasFile := fs.storage.Load(path)
if !hasFile {
size, _, err := fs.WriteFileReader(path, r, sizeHint)
return size, err
}
size, err := copyToBufferFromReader(file.data, r, 8*1024)
newFile := &memFile{
memFileInfo: memFileInfo{
name: path,
dir: false,
size: 0,
lastMod: time.Now(),
},
data: pool.Get(),
}
newFile.data.Grow(file.data.Len())
newFile.data.Write(file.data.Bytes())
size, err := copyToBufferFromReader(newFile.data, r, 8*1024)
if err != nil {
fs.logger.WithFields(log.Fields{
"path": path,
@@ -535,18 +593,22 @@ func (fs *memFilesystem) AppendFileReader(path string, r io.Reader, sizeHint int
"error": err,
}).Warn().Log("Incomplete file")
file.Close()
newFile.free()
return -1, fmt.Errorf("incomplete file")
}
file.size += size
fs.storage.Store(path, file)
oldFile, replace := fs.storage.Store(path, newFile)
fs.sizeLock.Lock()
defer fs.sizeLock.Unlock()
if replace {
oldFile.free()
}
fs.currentSize += size
fs.logger.Debug().WithFields(log.Fields{
@@ -583,7 +645,7 @@ func (fs *memFilesystem) Purge(size int64) int64 {
fs.currentSize -= f.size
fs.sizeLock.Unlock()
f.Close()
f.free()
fs.logger.WithFields(log.Fields{
"path": f.name,
@@ -643,7 +705,7 @@ func (fs *memFilesystem) Rename(src, dst string) error {
defer fs.sizeLock.Unlock()
if replace {
dstFile.Close()
dstFile.free()
fs.currentSize -= dstFile.size
}
@@ -663,13 +725,12 @@ func (fs *memFilesystem) Copy(src, dst string) error {
return os.ErrInvalid
}
srcFile, ok := fs.storage.LoadAndCopy(src)
srcFile, ok := fs.storage.Load(src)
if !ok {
return ErrNotExist
}
if srcFile.dir {
srcFile.Close()
return ErrNotExist
}
@@ -680,9 +741,12 @@ func (fs *memFilesystem) Copy(src, dst string) error {
size: srcFile.size,
lastMod: time.Now(),
},
data: srcFile.data,
data: pool.Get(),
}
dstFile.data.Grow(srcFile.data.Len())
dstFile.data.Write(srcFile.data.Bytes())
f, replace := fs.storage.Store(dst, dstFile)
if !replace {
@@ -693,7 +757,7 @@ func (fs *memFilesystem) Copy(src, dst string) error {
defer fs.sizeLock.Unlock()
if replace {
f.Close()
f.free()
fs.currentSize -= f.size
}
@@ -761,7 +825,7 @@ func (fs *memFilesystem) Remove(path string) int64 {
func (fs *memFilesystem) remove(path string) int64 {
file, ok := fs.storage.Delete(path)
if ok {
file.Close()
file.free()
fs.dirs.Remove(path)
@@ -851,7 +915,7 @@ func (fs *memFilesystem) RemoveList(path string, options ListOptions) ([]string,
fs.dirs.Remove(file.name)
file.Close()
file.free()
}
fs.sizeLock.Lock()

View File

@@ -21,10 +21,6 @@ type memStorage interface {
// i.e. all changes to the file will be reflected on the storage.
Load(key string) (value *memFile, ok bool)
// LoadAndCopy loads a file from the storage. The returned file is a copy
// and can be modified without modifying the file on the storage.
LoadAndCopy(key string) (value *memFile, ok bool)
// Has checks whether a file exists at path.
Has(key string) bool
@@ -68,33 +64,6 @@ func (m *mapOfStorage) Load(key string) (*memFile, bool) {
return m.files.Load(key)
}
func (m *mapOfStorage) LoadAndCopy(key string) (*memFile, bool) {
token := m.lock.RLock()
defer m.lock.RUnlock(token)
v, ok := m.files.Load(key)
if !ok {
return nil, false
}
f := &memFile{
memFileInfo: memFileInfo{
name: v.name,
size: v.size,
dir: v.dir,
lastMod: v.lastMod,
linkTo: v.linkTo,
},
r: nil,
}
if v.data != nil {
f.data = bytes.NewBuffer(v.data.Bytes())
}
return f, true
}
func (m *mapOfStorage) Has(key string) bool {
token := m.lock.RLock()
defer m.lock.RUnlock(token)

View File

@@ -48,6 +48,34 @@ func TestWriteWhileRead(t *testing.T) {
require.Equal(t, []byte("xxxxx"), data)
}
func TestCopy(t *testing.T) {
fs, err := NewMemFilesystem(MemConfig{})
require.NoError(t, err)
_, _, err = fs.WriteFile("/foobar", []byte("xxxxx"))
require.NoError(t, err)
data, err := fs.ReadFile("/foobar")
require.NoError(t, err)
require.Equal(t, []byte("xxxxx"), data)
err = fs.Copy("/foobar", "/barfoo")
require.NoError(t, err)
data, err = fs.ReadFile("/barfoo")
require.NoError(t, err)
require.Equal(t, []byte("xxxxx"), data)
fs.Remove("/foobar")
data, err = fs.ReadFile("/barfoo")
require.NoError(t, err)
require.Equal(t, []byte("xxxxx"), data)
}
func BenchmarkMemStorages(b *testing.B) {
storages := []string{
"map",

1
io/fs/memtest/.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
memtest

220
io/fs/memtest/memtest.go Normal file
View File

@@ -0,0 +1,220 @@
package main
import (
"bytes"
"context"
"flag"
"fmt"
"io"
"log"
gorand "math/rand/v2"
"os"
"os/signal"
"runtime"
"runtime/debug"
"strconv"
"sync"
"time"
"github.com/datarhei/core/v16/io/fs"
"github.com/datarhei/core/v16/math/rand"
"github.com/google/gops/agent"
)
func main() {
oStorage := "mapof"
oWriters := 500
oReaders := 1000
oFiles := 15
oInterval := 1 // seconds
oSize := 2 // megabytes
oLimit := false
flag.StringVar(&oStorage, "storage", "mapof", "type of mem storage implementation (mapof, map, swiss)")
flag.IntVar(&oWriters, "writers", 500, "number of concurrent writers")
flag.IntVar(&oReaders, "readers", 1000, "number of concurrent readers")
flag.IntVar(&oFiles, "files", 15, "number of files to keep per writer")
flag.IntVar(&oInterval, "interval", 1, "interval for writing files in seconds")
flag.IntVar(&oSize, "size", 2048, "size of files to write in kilobytes")
flag.BoolVar(&oLimit, "limit", false, "set memory limit")
flag.Parse()
estimatedSize := float64(oWriters*oFiles*oSize) / 1024 / 1024
fmt.Printf("Expecting effective memory consumption of %.1fGB\n", estimatedSize)
if oLimit {
fmt.Printf("Setting memory limit to %.1fGB\n", estimatedSize*1.5)
debug.SetMemoryLimit(int64(estimatedSize * 1.5))
}
memfs, err := fs.NewMemFilesystem(fs.MemConfig{
Storage: oStorage,
})
if err != nil {
log.Fatalf("acquiring new memfs: %s", err.Error())
}
err = agent.Listen(agent.Options{
Addr: ":9000",
ReuseSocketAddrAndPort: true,
})
if err != nil {
log.Fatalf("starting agent: %s", err.Error())
}
fmt.Printf("Started agent on :9000\n")
ctx, cancel := context.WithCancel(context.Background())
wgWriter := sync.WaitGroup{}
for i := 0; i < oWriters; i++ {
fmt.Printf("%4d / %4d writer started\r", i+1, oWriters)
wgWriter.Add(1)
go func(ctx context.Context, memfs fs.Filesystem, index int, nfiles int64, interval time.Duration) {
defer wgWriter.Done()
jitter := gorand.IntN(200)
interval += time.Duration(jitter) * time.Millisecond
sequence := int64(0)
buf := bytes.NewBufferString(rand.StringAlphanumeric(oSize * (1024 + jitter - 100)))
r := bytes.NewReader(buf.Bytes())
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
path := fmt.Sprintf("/foobar/test_%d_%06d.dat", index, sequence)
// Write file to memfs
r.Seek(0, io.SeekStart)
memfs.WriteFileReader(path, r, -1)
// Delete file from memfs
if sequence-nfiles >= 0 {
path = fmt.Sprintf("/foobar/test_%d_%06d.dat", index, sequence-nfiles)
memfs.Remove(path)
}
path = fmt.Sprintf("/foobar/test_%d.last", index)
memfs.WriteFile(path, []byte(strconv.FormatInt(sequence, 10)))
sequence++
}
}
}(ctx, memfs, i, int64(oFiles), time.Duration(oInterval)*time.Second)
}
fmt.Printf("\n")
wgReader := sync.WaitGroup{}
if oReaders > 0 {
for i := 0; i < oReaders; i++ {
fmt.Printf("%4d / %4d reader started\r", i+1, oReaders)
wgReader.Add(1)
go func(ctx context.Context, memfs fs.Filesystem, interval time.Duration) {
defer wgReader.Done()
buf := bytes.Buffer{}
jitter := gorand.IntN(200)
interval += time.Duration(jitter) * time.Millisecond
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
index := gorand.IntN(oWriters)
path := fmt.Sprintf("/foobar/test_%d.list", index)
data, err := memfs.ReadFile(path)
if err != nil {
continue
}
sequence, err := strconv.ParseUint(string(data), 10, 64)
if err != nil {
continue
}
path = fmt.Sprintf("/foobar/test_%d_%06d.dat", index, sequence)
file := memfs.Open(path)
buf.ReadFrom(file)
buf.Reset()
}
}
}(ctx, memfs, time.Duration(oInterval)*time.Second)
}
fmt.Printf("\n")
}
go func(ctx context.Context, memfs fs.Filesystem) {
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
nMallocs := uint64(0)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
m := runtime.MemStats{}
runtime.ReadMemStats(&m)
size, _ := memfs.Size()
fmt.Printf("%5.1fGB ", float64(size)/1024/1024/1024)
listfiles := 0
listsize := int64(0)
files := memfs.List("/", fs.ListOptions{})
for _, f := range files {
listsize += f.Size()
}
listfiles = len(files)
fmt.Printf("(%7d files with %5.1fGB) ", listfiles, float64(listsize)/1024/1024/1024)
fmt.Printf("alloc=%5.1fGB (%8.1fGB) sys=%5.1fGB idle=%5.1fGB inuse=%5.1fGB mallocs=%d objects=%d\n", float64(m.HeapAlloc)/1024/1024/1024, float64(m.TotalAlloc)/1024/1024/1024, float64(m.HeapSys)/1024/1024/1024, float64(m.HeapIdle)/1024/1024/1024, float64(m.HeapInuse)/1024/1024/1024, m.Mallocs-nMallocs, m.Mallocs-m.Frees)
nMallocs = m.Mallocs
}
}
}(ctx, memfs)
quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt)
<-quit
cancel()
fmt.Printf("Waiting for readers to stop ...\n")
wgReader.Wait()
fmt.Printf("Waiting for writers to stop ...\n")
wgWriter.Wait()
fmt.Printf("Done\n")
}