Implement own byte buffer type

This commit is contained in:
Ingo Oppermann
2024-10-10 16:35:39 +02:00
parent a581f1dbc2
commit 719449a4c8
20 changed files with 217 additions and 217 deletions

View File

@@ -1,6 +1,7 @@
package raft
import (
"bytes"
"encoding/base64"
"fmt"
"io"
@@ -15,7 +16,6 @@ import (
"github.com/datarhei/core/v16/cluster/store"
"github.com/datarhei/core/v16/encoding/json"
"github.com/datarhei/core/v16/log"
"github.com/datarhei/core/v16/mem"
"go.etcd.io/bbolt"
"github.com/hashicorp/go-hclog"
@@ -318,18 +318,6 @@ func (r *raft) LeadershipTransfer(id string) error {
return nil
}
type readCloserWrapper struct {
io.Reader
}
func (rcw *readCloserWrapper) Read(p []byte) (int, error) {
return rcw.Reader.Read(p)
}
func (rcw *readCloserWrapper) Close() error {
return nil
}
type Snapshot struct {
Metadata *hcraft.SnapshotMeta
Data string
@@ -359,14 +347,14 @@ func (r *raft) Snapshot() (io.ReadCloser, error) {
Data: base64.StdEncoding.EncodeToString(data),
}
buffer := mem.Get()
buffer := &bytes.Buffer{}
enc := json.NewEncoder(buffer)
err = enc.Encode(snapshot)
if err != nil {
return nil, err
}
return &readCloserWrapper{buffer}, nil
return io.NopCloser(buffer), nil
}
func (r *raft) start(fsm hcraft.FSM, peers []Peer, inmem bool) error {

View File

@@ -661,7 +661,7 @@ func (r *restclient) login() error {
e := json.NewEncoder(buf)
e.Encode(login)
req, err := http.NewRequest("POST", r.address+r.prefix+"/login", buf)
req, err := http.NewRequest("POST", r.address+r.prefix+"/login", buf.Reader())
if err != nil {
return err
}

View File

@@ -19,7 +19,7 @@ func (r *restclient) Events(ctx context.Context, filters api.EventFilters) (<-ch
header := make(http.Header)
header.Set("Accept", "application/x-json-stream")
stream, err := r.stream(ctx, "POST", "/v3/events", nil, header, "application/json", buf)
stream, err := r.stream(ctx, "POST", "/v3/events", nil, header, "application/json", buf.Reader())
if err != nil {
return nil, err
}

View File

@@ -74,7 +74,7 @@ func (r *restclient) ProcessAdd(p *app.Config, metadata map[string]interface{})
e := json.NewEncoder(buf)
e.Encode(config)
_, err := r.call("POST", "/v3/process", nil, nil, "application/json", buf)
_, err := r.call("POST", "/v3/process", nil, nil, "application/json", buf.Reader())
if err != nil {
return err
}
@@ -95,7 +95,7 @@ func (r *restclient) ProcessUpdate(id app.ProcessID, p *app.Config, metadata map
query := &url.Values{}
query.Set("domain", id.Domain)
_, err := r.call("PUT", "/v3/process/"+url.PathEscape(id.ID), query, nil, "application/json", buf)
_, err := r.call("PUT", "/v3/process/"+url.PathEscape(id.ID), query, nil, "application/json", buf.Reader())
if err != nil {
return err
}
@@ -116,7 +116,7 @@ func (r *restclient) ProcessReportSet(id app.ProcessID, report *app.Report) erro
query := &url.Values{}
query.Set("domain", id.Domain)
_, err := r.call("PUT", "/v3/process/"+url.PathEscape(id.ID)+"/report", query, nil, "application/json", buf)
_, err := r.call("PUT", "/v3/process/"+url.PathEscape(id.ID)+"/report", query, nil, "application/json", buf.Reader())
if err != nil {
return err
}
@@ -145,7 +145,7 @@ func (r *restclient) ProcessCommand(id app.ProcessID, command string) error {
query := &url.Values{}
query.Set("domain", id.Domain)
_, err := r.call("PUT", "/v3/process/"+url.PathEscape(id.ID)+"/command", query, nil, "application/json", buf)
_, err := r.call("PUT", "/v3/process/"+url.PathEscape(id.ID)+"/command", query, nil, "application/json", buf.Reader())
return err
}
@@ -182,7 +182,7 @@ func (r *restclient) ProcessMetadataSet(id app.ProcessID, key string, metadata a
query := &url.Values{}
query.Set("domain", id.Domain)
_, err := r.call("PUT", "/v3/process/"+url.PathEscape(id.ID)+"/metadata/"+url.PathEscape(key), query, nil, "application/json", buf)
_, err := r.call("PUT", "/v3/process/"+url.PathEscape(id.ID)+"/metadata/"+url.PathEscape(key), query, nil, "application/json", buf.Reader())
return err
}
@@ -215,7 +215,7 @@ func (r *restclient) ProcessProbeConfig(p *app.Config) (api.Probe, error) {
e := json.NewEncoder(buf)
e.Encode(config)
data, err := r.call("POST", "/v3/process/probe", nil, nil, "application/json", buf)
data, err := r.call("POST", "/v3/process/probe", nil, nil, "application/json", buf.Reader())
if err != nil {
return probe, err
}

View File

@@ -2,7 +2,6 @@ package compress
import (
"bufio"
"bytes"
"fmt"
"io"
"net"
@@ -55,7 +54,7 @@ type compressResponseWriter struct {
wroteBody bool
minLength int
minLengthExceeded bool
buffer *bytes.Buffer
buffer *mem.Buffer
code int
headerContentLength string
scheme string

View File

@@ -127,7 +127,7 @@ func (h *hlsrewrite) rewrite(c echo.Context, next echo.HandlerFunc) error {
type hlsRewriter struct {
http.ResponseWriter
buffer *bytes.Buffer
buffer *mem.Buffer
}
func (g *hlsRewriter) Write(data []byte) (int, error) {
@@ -137,9 +137,9 @@ func (g *hlsRewriter) Write(data []byte) (int, error) {
return w, err
}
func (g *hlsRewriter) rewrite(pathPrefix []byte, buffer *bytes.Buffer) {
func (g *hlsRewriter) rewrite(pathPrefix []byte, buffer *mem.Buffer) {
// Find all URLS in the .m3u8 and add the session ID to the query string
scanner := bufio.NewScanner(g.buffer)
scanner := bufio.NewScanner(g.buffer.Reader())
for scanner.Scan() {
line := scanner.Bytes()

View File

@@ -1,10 +1,10 @@
package hlsrewrite
import (
"bytes"
"os"
"testing"
"github.com/datarhei/core/v16/mem"
"github.com/stretchr/testify/require"
)
@@ -16,12 +16,12 @@ func TestRewrite(t *testing.T) {
require.NoError(t, err)
r := &hlsRewriter{
buffer: &bytes.Buffer{},
buffer: &mem.Buffer{},
}
r.Write(data)
buffer := &bytes.Buffer{}
buffer := &mem.Buffer{}
prefix := []byte("/path/to/foobar/")
r.rewrite(prefix, buffer)
@@ -33,10 +33,10 @@ func BenchmarkRewrite(b *testing.B) {
require.NoError(b, err)
r := &hlsRewriter{
buffer: &bytes.Buffer{},
buffer: &mem.Buffer{},
}
buffer := &bytes.Buffer{}
buffer := &mem.Buffer{}
prefix := []byte("/path/to/foobar/")
for i := 0; i < b.N; i++ {

View File

@@ -3,7 +3,6 @@ package session
import (
"bufio"
"bytes"
"io"
"net/http"
"net/url"
@@ -11,6 +10,7 @@ import (
"path/filepath"
"strings"
"github.com/datarhei/core/v16/mem"
"github.com/datarhei/core/v16/net"
"github.com/lithammer/shortuuid/v4"
@@ -233,7 +233,7 @@ func (h *handler) handleHLSEgress(c echo.Context, _ string, data map[string]inte
type segmentReader struct {
reader io.ReadCloser
buffer *bytes.Buffer
buffer *mem.Buffer
size int64
}
@@ -255,7 +255,7 @@ func (r *segmentReader) getSegments(dir string) []string {
segments := []string{}
// Find all segment URLs in the .m3u8
scanner := bufio.NewScanner(r.buffer)
scanner := bufio.NewScanner(r.buffer.Reader())
for scanner.Scan() {
line := scanner.Text()
@@ -299,7 +299,7 @@ func (r *segmentReader) getSegments(dir string) []string {
type sessionRewriter struct {
http.ResponseWriter
buffer *bytes.Buffer
buffer *mem.Buffer
}
func (g *sessionRewriter) Write(data []byte) (int, error) {
@@ -307,11 +307,11 @@ func (g *sessionRewriter) Write(data []byte) (int, error) {
return g.buffer.Write(data)
}
func (g *sessionRewriter) rewriteHLS(sessionID string, requestURL *url.URL, buffer *bytes.Buffer) {
func (g *sessionRewriter) rewriteHLS(sessionID string, requestURL *url.URL, buffer *mem.Buffer) {
isMaster := false
// Find all URLS in the .m3u8 and add the session ID to the query string
scanner := bufio.NewScanner(g.buffer)
scanner := bufio.NewScanner(g.buffer.Reader())
for scanner.Scan() {
byteline := scanner.Bytes()

View File

@@ -19,7 +19,7 @@ func TestHLSSegmentReader(t *testing.T) {
br := &segmentReader{
reader: io.NopCloser(r),
buffer: &bytes.Buffer{},
buffer: &mem.Buffer{},
}
_, err = io.ReadAll(br)
@@ -66,7 +66,7 @@ func TestHLSRewrite(t *testing.T) {
require.NoError(t, err)
br := &sessionRewriter{
buffer: &bytes.Buffer{},
buffer: &mem.Buffer{},
}
_, err = br.Write(data)
@@ -75,7 +75,7 @@ func TestHLSRewrite(t *testing.T) {
u, err := url.Parse("http://example.com/test.m3u8")
require.NoError(t, err)
buffer := &bytes.Buffer{}
buffer := &mem.Buffer{}
br.rewriteHLS("oT5GV8eWBbRAh4aib5egoK", u, buffer)

View File

@@ -1,7 +1,6 @@
package session
import (
"bytes"
"fmt"
"io"
"net/http"
@@ -177,7 +176,7 @@ func verifySession(raw interface{}, path, referrer string) (map[string]interface
return data, nil
}
func headerSize(header http.Header, buffer *bytes.Buffer) int64 {
func headerSize(header http.Header, buffer *mem.Buffer) int64 {
buffer.Reset()
header.Write(buffer)

View File

@@ -1,11 +1,11 @@
package session
import (
"bytes"
"net/http"
"testing"
"github.com/datarhei/core/v16/encoding/json"
"github.com/datarhei/core/v16/mem"
"github.com/stretchr/testify/require"
)
@@ -143,7 +143,7 @@ func TestHeaderSize(t *testing.T) {
header.Add("Content-Type", "application/json")
header.Add("Content-Encoding", "gzip")
buffer := &bytes.Buffer{}
buffer := &mem.Buffer{}
size := headerSize(header, buffer)
require.Equal(t, "Content-Encoding: gzip\r\nContent-Type: application/json\r\n", buffer.String())
@@ -156,7 +156,7 @@ func BenchmarkHeaderSize(b *testing.B) {
header.Add("Content-Type", "application/json")
header.Add("Content-Encoding", "gzip")
buffer := &bytes.Buffer{}
buffer := &mem.Buffer{}
for i := 0; i < b.N; i++ {
headerSize(header, buffer)

View File

@@ -2,7 +2,6 @@ package fs
import (
"bytes"
"errors"
"fmt"
"io"
"io/fs"
@@ -69,8 +68,8 @@ func (f *memFileInfo) IsDir() bool {
type memFile struct {
memFileInfo
data *bytes.Buffer // Contents of the file
r *bytes.Reader
data *mem.Buffer // Contents of the file
r io.ReadSeeker
}
func (f *memFile) Name() string {
@@ -380,9 +379,7 @@ func (fs *memFilesystem) ReadFile(path string) ([]byte, error) {
}
data := pool.Get()
data.Grow(file.data.Len())
data.Write(file.data.Bytes())
file.data.WriteTo(data)
return data.Bytes(), nil
}
@@ -433,35 +430,6 @@ func (fs *memFilesystem) Symlink(oldname, newname string) error {
return nil
}
func copyToBufferFromReader(buf *bytes.Buffer, r io.Reader, _ int) (int64, error) {
chunkData := [128 * 1024]byte{}
chunk := chunkData[0:]
size := int64(0)
for {
n, err := r.Read(chunk)
if n != 0 {
buf.Write(chunk[:n])
size += int64(n)
}
if err != nil {
if errors.Is(err, io.EOF) {
return size, nil
}
return size, err
}
if n == 0 {
break
}
}
return size, nil
}
func (fs *memFilesystem) WriteFileReader(path string, r io.Reader, sizeHint int) (int64, bool, error) {
path = fs.cleanPath(path)
@@ -480,11 +448,7 @@ func (fs *memFilesystem) WriteFileReader(path string, r io.Reader, sizeHint int)
data: pool.Get(),
}
if sizeHint > 0 && sizeHint < 5*1024*1024 {
newFile.data.Grow(sizeHint)
}
size, err := copyToBufferFromReader(newFile.data, r, 8*1024)
size, err := newFile.data.ReadFrom(r)
if err != nil {
fs.logger.WithFields(log.Fields{
"path": path,
@@ -558,10 +522,9 @@ func (fs *memFilesystem) AppendFileReader(path string, r io.Reader, sizeHint int
data: pool.Get(),
}
newFile.data.Grow(file.data.Len())
newFile.data.Write(file.data.Bytes())
file.data.WriteTo(newFile.data)
size, err := copyToBufferFromReader(newFile.data, r, 8*1024)
size, err := newFile.data.ReadFrom(r)
if err != nil {
fs.logger.WithFields(log.Fields{
"path": path,
@@ -720,8 +683,7 @@ func (fs *memFilesystem) Copy(src, dst string) error {
data: pool.Get(),
}
dstFile.data.Grow(srcFile.data.Len())
dstFile.data.Write(srcFile.data.Bytes())
srcFile.data.WriteTo(dstFile.data)
f, replace := fs.storage.Store(dst, dstFile)

View File

@@ -1,7 +1,6 @@
package fs
import (
"bytes"
"sync"
"github.com/dolthub/swiss"
@@ -122,33 +121,6 @@ func (m *mapStorage) Load(key string) (*memFile, bool) {
return v, ok
}
func (m *mapStorage) LoadAndCopy(key string) (*memFile, bool) {
m.lock.RLock()
defer m.lock.RUnlock()
v, ok := m.files[key]
if !ok {
return nil, false
}
f := &memFile{
memFileInfo: memFileInfo{
name: v.name,
size: v.size,
dir: v.dir,
lastMod: v.lastMod,
linkTo: v.linkTo,
},
r: nil,
}
if v.data != nil {
f.data = bytes.NewBuffer(v.data.Bytes())
}
return f, true
}
func (m *mapStorage) Has(key string) bool {
m.lock.RLock()
defer m.lock.RUnlock()
@@ -214,33 +186,6 @@ func (m *swissMapStorage) Load(key string) (*memFile, bool) {
return m.files.Get(key)
}
func (m *swissMapStorage) LoadAndCopy(key string) (*memFile, bool) {
token := m.lock.RLock()
defer m.lock.RUnlock(token)
v, ok := m.files.Get(key)
if !ok {
return nil, false
}
f := &memFile{
memFileInfo: memFileInfo{
name: v.name,
size: v.size,
dir: v.dir,
lastMod: v.lastMod,
linkTo: v.linkTo,
},
r: nil,
}
if v.data != nil {
f.data = bytes.NewBuffer(v.data.Bytes())
}
return f, true
}
func (m *swissMapStorage) Has(key string) bool {
token := m.lock.RLock()
defer m.lock.RUnlock(token)

View File

@@ -1,7 +1,6 @@
package fs
import (
"bytes"
"context"
"fmt"
"io"
@@ -231,36 +230,3 @@ func benchmarkMemReadFileWhileWriting(b *testing.B, fs Filesystem) {
readerWg.Wait()
}
func BenchmarkBufferReadFrom(b *testing.B) {
data := []byte(rand.StringAlphanumeric(1024 * 1024))
for i := 0; i < b.N; i++ {
r := bytes.NewReader(data)
buf := &bytes.Buffer{}
buf.ReadFrom(r)
}
}
func TestBufferReadChunks(t *testing.T) {
data := []byte(rand.StringAlphanumeric(1024 * 1024))
r := bytes.NewReader(data)
buf := &bytes.Buffer{}
copyToBufferFromReader(buf, r, 32*1024)
res := bytes.Compare(data, buf.Bytes())
require.Equal(t, 0, res)
}
func BenchmarkBufferReadChunks(b *testing.B) {
data := []byte(rand.StringAlphanumeric(1024 * 1024))
for i := 0; i < b.N; i++ {
r := bytes.NewReader(data)
buf := &bytes.Buffer{}
copyToBufferFromReader(buf, r, 32*1024)
}
}

View File

@@ -378,7 +378,7 @@ func (fs *s3Filesystem) AppendFileReader(path string, r io.Reader, sizeHint int)
buffer.ReadFrom(object)
buffer.ReadFrom(r)
size, _, err := fs.write(path, buffer)
size, _, err := fs.write(path, buffer.Reader())
return size, err
}

View File

@@ -71,8 +71,10 @@ func (r *sizedFilesystem) WriteFileReader(path string, rd io.Reader, sizeHint in
return r.Filesystem.WriteFileReader(path, rd, sizeHint)
}
data := bytes.Buffer{}
size, err := copyToBufferFromReader(&data, rd, 8*1024)
data := pool.Get()
defer pool.Put(data)
size, err := data.ReadFrom(rd)
if err != nil {
return -1, false, err
}
@@ -97,7 +99,7 @@ func (r *sizedFilesystem) WriteFileReader(path string, rd io.Reader, sizeHint in
}
}
return r.Filesystem.WriteFileReader(path, &data, int(size))
return r.Filesystem.WriteFileReader(path, data.Reader(), int(size))
}
func (r *sizedFilesystem) WriteFile(path string, data []byte) (int64, bool, error) {
@@ -141,8 +143,10 @@ func (r *sizedFilesystem) AppendFileReader(path string, rd io.Reader, sizeHint i
return r.Filesystem.AppendFileReader(path, rd, sizeHint)
}
data := bytes.Buffer{}
size, err := copyToBufferFromReader(&data, rd, 8*1024)
data := pool.Get()
defer pool.Put(data)
size, err := data.ReadFrom(rd)
if err != nil {
return -1, err
}
@@ -162,7 +166,7 @@ func (r *sizedFilesystem) AppendFileReader(path string, rd io.Reader, sizeHint i
}
}
return r.Filesystem.AppendFileReader(path, &data, int(size))
return r.Filesystem.AppendFileReader(path, data.Reader(), int(size))
}
func (r *sizedFilesystem) Purge(size int64) int64 {

View File

@@ -2,46 +2,90 @@ package mem
import (
"bytes"
"sync"
"errors"
"io"
)
type BufferPool struct {
pool sync.Pool
type Buffer struct {
data bytes.Buffer
}
func NewBufferPool() *BufferPool {
p := &BufferPool{
pool: sync.Pool{
New: func() any {
return &bytes.Buffer{}
},
},
// Len returns the length of the buffer.
func (b *Buffer) Len() int {
return b.data.Len()
}
// Bytes returns the buffer, but keeps ownership.
func (b *Buffer) Bytes() []byte {
return b.data.Bytes()
}
// WriteTo writes the bytes to the writer.
func (b *Buffer) WriteTo(w io.Writer) (int64, error) {
n, err := w.Write(b.data.Bytes())
return int64(n), err
}
// Reset empties the buffer and keeps it's capacity.
func (b *Buffer) Reset() {
b.data.Reset()
}
// Write appends to the buffer.
func (b *Buffer) Write(p []byte) (int, error) {
return b.data.Write(p)
}
// ReadFrom reads from the reader and appends to the buffer.
func (b *Buffer) ReadFrom(r io.Reader) (int64, error) {
if br, ok := r.(*bytes.Reader); ok {
b.data.Grow(br.Len())
}
return p
chunkData := [128 * 1024]byte{}
chunk := chunkData[0:]
size := int64(0)
for {
n, err := r.Read(chunk)
if n != 0 {
b.data.Write(chunk[:n])
size += int64(n)
}
if err != nil {
if errors.Is(err, io.EOF) {
return size, nil
}
return size, err
}
if n == 0 {
break
}
}
return size, nil
}
func (p *BufferPool) Get() *bytes.Buffer {
buf := p.pool.Get().(*bytes.Buffer)
buf.Reset()
return buf
// WriteByte appends a byte to the buffer.
func (b *Buffer) WriteByte(c byte) error {
return b.data.WriteByte(c)
}
func (p *BufferPool) Put(buf *bytes.Buffer) {
p.pool.Put(buf)
// WriteString appends a string to the buffer.
func (b *Buffer) WriteString(s string) (n int, err error) {
return b.data.WriteString(s)
}
var DefaultBufferPool *BufferPool
func init() {
DefaultBufferPool = NewBufferPool()
// Reader returns a bytes.Reader based on the data in the buffer.
func (b *Buffer) Reader() *bytes.Reader {
return bytes.NewReader(b.Bytes())
}
func Get() *bytes.Buffer {
return DefaultBufferPool.Get()
}
func Put(buf *bytes.Buffer) {
DefaultBufferPool.Put(buf)
// String returns the data in the buffer a string.
func (b *Buffer) String() string {
return b.data.String()
}

47
mem/buffer_test.go Normal file
View File

@@ -0,0 +1,47 @@
package mem
import (
"bytes"
"io"
"testing"
"github.com/datarhei/core/v16/math/rand"
"github.com/stretchr/testify/require"
)
func TestBufferReadChunks(t *testing.T) {
data := []byte(rand.StringAlphanumeric(1024 * 1024))
r := bytes.NewReader(data)
buf := &Buffer{}
buf.ReadFrom(r)
res := bytes.Compare(data, buf.Bytes())
require.Equal(t, 0, res)
}
func BenchmarkBufferReadFrom(b *testing.B) {
data := []byte(rand.StringAlphanumeric(1024 * 1024))
r := bytes.NewReader(data)
for i := 0; i < b.N; i++ {
r.Seek(0, io.SeekStart)
buf := &Buffer{}
buf.ReadFrom(r)
}
}
func BenchmarkBytesBufferReadFrom(b *testing.B) {
data := []byte(rand.StringAlphanumeric(1024 * 1024))
r := bytes.NewReader(data)
for i := 0; i < b.N; i++ {
r.Seek(0, io.SeekStart)
buf := &bytes.Buffer{}
buf.ReadFrom(r)
}
}

46
mem/pool.go Normal file
View File

@@ -0,0 +1,46 @@
package mem
import (
"sync"
)
type BufferPool struct {
pool sync.Pool
}
func NewBufferPool() *BufferPool {
p := &BufferPool{
pool: sync.Pool{
New: func() any {
return &Buffer{}
},
},
}
return p
}
func (p *BufferPool) Get() *Buffer {
buf := p.pool.Get().(*Buffer)
buf.Reset()
return buf
}
func (p *BufferPool) Put(buf *Buffer) {
p.pool.Put(buf)
}
var DefaultBufferPool *BufferPool
func init() {
DefaultBufferPool = NewBufferPool()
}
func Get() *Buffer {
return DefaultBufferPool.Get()
}
func Put(buf *Buffer) {
DefaultBufferPool.Put(buf)
}

View File

@@ -243,7 +243,7 @@ func (a *api) Monitor(id string, monitordata MonitorData) (MonitorResponse, erro
}
*/
response, err := a.callWithRetry(http.MethodPut, "api/v1/core/monitor/"+id, data)
response, err := a.callWithRetry(http.MethodPut, "api/v1/core/monitor/"+id, data.Reader())
if err != nil {
return MonitorResponse{}, fmt.Errorf("error sending request: %w", err)
}