User buffer pool where appropriate

This commit is contained in:
Ingo Oppermann
2024-10-10 15:09:50 +02:00
parent 91874e6caf
commit a581f1dbc2
13 changed files with 207 additions and 67 deletions

View File

@@ -1,7 +1,6 @@
package raft
import (
"bytes"
"encoding/base64"
"fmt"
"io"
@@ -16,6 +15,7 @@ import (
"github.com/datarhei/core/v16/cluster/store"
"github.com/datarhei/core/v16/encoding/json"
"github.com/datarhei/core/v16/log"
"github.com/datarhei/core/v16/mem"
"go.etcd.io/bbolt"
"github.com/hashicorp/go-hclog"
@@ -359,14 +359,14 @@ func (r *raft) Snapshot() (io.ReadCloser, error) {
Data: base64.StdEncoding.EncodeToString(data),
}
buffer := bytes.Buffer{}
enc := json.NewEncoder(&buffer)
buffer := mem.Get()
enc := json.NewEncoder(buffer)
err = enc.Encode(snapshot)
if err != nil {
return nil, err
}
return &readCloserWrapper{&buffer}, nil
return &readCloserWrapper{buffer}, nil
}
func (r *raft) start(fsm hcraft.FSM, peers []Peer, inmem bool) error {

View File

@@ -1,7 +1,6 @@
package client
import (
"bytes"
"context"
"fmt"
"io"
@@ -15,6 +14,7 @@ import (
"github.com/datarhei/core/v16/encoding/json"
"github.com/datarhei/core/v16/glob"
"github.com/datarhei/core/v16/http/api"
"github.com/datarhei/core/v16/mem"
"github.com/datarhei/core/v16/restream/app"
"github.com/Masterminds/semver/v3"
@@ -199,6 +199,8 @@ type restclient struct {
connectedCore *semver.Version
methods map[string][]apiconstraint
}
pool *mem.BufferPool
}
// New returns a new REST API client for the given config. The error is non-nil
@@ -212,6 +214,7 @@ func New(config Config) (RestClient, error) {
auth0Token: config.Auth0Token,
client: config.Client,
clientTimeout: config.Timeout,
pool: mem.NewBufferPool(),
}
if len(config.AccessToken) != 0 {
@@ -652,12 +655,13 @@ func (r *restclient) login() error {
login.Password = r.password
}
var buf bytes.Buffer
buf := r.pool.Get()
defer r.pool.Put(buf)
e := json.NewEncoder(&buf)
e := json.NewEncoder(buf)
e.Encode(login)
req, err := http.NewRequest("POST", r.address+r.prefix+"/login", &buf)
req, err := http.NewRequest("POST", r.address+r.prefix+"/login", buf)
if err != nil {
return err
}

View File

@@ -1,7 +1,6 @@
package client
import (
"bytes"
"context"
"io"
"net/http"
@@ -11,15 +10,16 @@ import (
)
func (r *restclient) Events(ctx context.Context, filters api.EventFilters) (<-chan api.Event, error) {
var buf bytes.Buffer
buf := r.pool.Get()
defer r.pool.Put(buf)
e := json.NewEncoder(&buf)
e := json.NewEncoder(buf)
e.Encode(filters)
header := make(http.Header)
header.Set("Accept", "application/x-json-stream")
stream, err := r.stream(ctx, "POST", "/v3/events", nil, header, "application/json", &buf)
stream, err := r.stream(ctx, "POST", "/v3/events", nil, header, "application/json", buf)
if err != nil {
return nil, err
}

View File

@@ -1,7 +1,6 @@
package client
import (
"bytes"
"net/url"
"strings"
@@ -66,15 +65,16 @@ func (r *restclient) Process(id app.ProcessID, filter []string) (api.Process, er
}
func (r *restclient) ProcessAdd(p *app.Config, metadata map[string]interface{}) error {
var buf bytes.Buffer
buf := r.pool.Get()
defer r.pool.Put(buf)
config := api.ProcessConfig{}
config.Unmarshal(p, metadata)
e := json.NewEncoder(&buf)
e := json.NewEncoder(buf)
e.Encode(config)
_, err := r.call("POST", "/v3/process", nil, nil, "application/json", &buf)
_, err := r.call("POST", "/v3/process", nil, nil, "application/json", buf)
if err != nil {
return err
}
@@ -83,18 +83,19 @@ func (r *restclient) ProcessAdd(p *app.Config, metadata map[string]interface{})
}
func (r *restclient) ProcessUpdate(id app.ProcessID, p *app.Config, metadata map[string]interface{}) error {
var buf bytes.Buffer
buf := r.pool.Get()
defer r.pool.Put(buf)
config := api.ProcessConfig{}
config.Unmarshal(p, metadata)
e := json.NewEncoder(&buf)
e := json.NewEncoder(buf)
e.Encode(config)
query := &url.Values{}
query.Set("domain", id.Domain)
_, err := r.call("PUT", "/v3/process/"+url.PathEscape(id.ID), query, nil, "application/json", &buf)
_, err := r.call("PUT", "/v3/process/"+url.PathEscape(id.ID), query, nil, "application/json", buf)
if err != nil {
return err
}
@@ -103,18 +104,19 @@ func (r *restclient) ProcessUpdate(id app.ProcessID, p *app.Config, metadata map
}
func (r *restclient) ProcessReportSet(id app.ProcessID, report *app.Report) error {
var buf bytes.Buffer
buf := r.pool.Get()
defer r.pool.Put(buf)
data := api.ProcessReport{}
data.Unmarshal(report)
e := json.NewEncoder(&buf)
e := json.NewEncoder(buf)
e.Encode(data)
query := &url.Values{}
query.Set("domain", id.Domain)
_, err := r.call("PUT", "/v3/process/"+url.PathEscape(id.ID)+"/report", query, nil, "application/json", &buf)
_, err := r.call("PUT", "/v3/process/"+url.PathEscape(id.ID)+"/report", query, nil, "application/json", buf)
if err != nil {
return err
}
@@ -132,9 +134,10 @@ func (r *restclient) ProcessDelete(id app.ProcessID) error {
}
func (r *restclient) ProcessCommand(id app.ProcessID, command string) error {
var buf bytes.Buffer
buf := r.pool.Get()
defer r.pool.Put(buf)
e := json.NewEncoder(&buf)
e := json.NewEncoder(buf)
e.Encode(api.Command{
Command: command,
})
@@ -142,7 +145,7 @@ func (r *restclient) ProcessCommand(id app.ProcessID, command string) error {
query := &url.Values{}
query.Set("domain", id.Domain)
_, err := r.call("PUT", "/v3/process/"+url.PathEscape(id.ID)+"/command", query, nil, "application/json", &buf)
_, err := r.call("PUT", "/v3/process/"+url.PathEscape(id.ID)+"/command", query, nil, "application/json", buf)
return err
}
@@ -170,15 +173,16 @@ func (r *restclient) ProcessMetadata(id app.ProcessID, key string) (api.Metadata
}
func (r *restclient) ProcessMetadataSet(id app.ProcessID, key string, metadata api.Metadata) error {
var buf bytes.Buffer
buf := r.pool.Get()
defer r.pool.Put(buf)
e := json.NewEncoder(&buf)
e := json.NewEncoder(buf)
e.Encode(metadata)
query := &url.Values{}
query.Set("domain", id.Domain)
_, err := r.call("PUT", "/v3/process/"+url.PathEscape(id.ID)+"/metadata/"+url.PathEscape(key), query, nil, "application/json", &buf)
_, err := r.call("PUT", "/v3/process/"+url.PathEscape(id.ID)+"/metadata/"+url.PathEscape(key), query, nil, "application/json", buf)
return err
}
@@ -201,15 +205,17 @@ func (r *restclient) ProcessProbe(id app.ProcessID) (api.Probe, error) {
func (r *restclient) ProcessProbeConfig(p *app.Config) (api.Probe, error) {
var probe api.Probe
var buf bytes.Buffer
buf := r.pool.Get()
defer r.pool.Put(buf)
config := api.ProcessConfig{}
config.Unmarshal(p, nil)
e := json.NewEncoder(&buf)
e := json.NewEncoder(buf)
e.Encode(config)
data, err := r.call("POST", "/v3/process/probe", nil, nil, "application/json", &buf)
data, err := r.call("POST", "/v3/process/probe", nil, nil, "application/json", buf)
if err != nil {
return probe, err
}

View File

@@ -0,0 +1,29 @@
#EXTM3U
#EXT-X-VERSION:6
#EXT-X-TARGETDURATION:2
#EXT-X-MEDIA-SEQUENCE:303
#EXT-X-INDEPENDENT-SEGMENTS
#EXTINF:2.000000,
#EXT-X-PROGRAM-DATE-TIME:2024-10-09T12:56:35.019+0200
/path/to/foobar/test_0_0_0303.ts
#EXTINF:2.000000,
#EXT-X-PROGRAM-DATE-TIME:2024-10-09T12:56:37.019+0200
/path/to/foobar/test_0_0_0304.ts
#EXTINF:2.000000,
#EXT-X-PROGRAM-DATE-TIME:2024-10-09T12:56:39.019+0200
/path/to/foobar/test_0_0_0305.ts
#EXTINF:2.000000,
#EXT-X-PROGRAM-DATE-TIME:2024-10-09T12:56:41.019+0200
/path/to/foobar/test_0_0_0306.ts
#EXTINF:2.000000,
#EXT-X-PROGRAM-DATE-TIME:2024-10-09T12:56:43.019+0200
/path/to/foobar/test_0_0_0307.ts
#EXTINF:2.000000,
#EXT-X-PROGRAM-DATE-TIME:2024-10-09T12:56:45.019+0200
/path/to/foobar/test_0_0_0308.ts
#EXTINF:2.000000,
#EXT-X-PROGRAM-DATE-TIME:2024-10-09T12:56:47.019+0200
/path/to/foobar/test_0_0_0309.ts
#EXTINF:2.000000,
#EXT-X-PROGRAM-DATE-TIME:2024-10-09T12:56:49.019+0200
/path/to/foobar/test_0_0_0310.ts

View File

@@ -0,0 +1,29 @@
#EXTM3U
#EXT-X-VERSION:6
#EXT-X-TARGETDURATION:2
#EXT-X-MEDIA-SEQUENCE:303
#EXT-X-INDEPENDENT-SEGMENTS
#EXTINF:2.000000,
#EXT-X-PROGRAM-DATE-TIME:2024-10-09T12:56:35.019+0200
test_0_0_0303.ts
#EXTINF:2.000000,
#EXT-X-PROGRAM-DATE-TIME:2024-10-09T12:56:37.019+0200
test_0_0_0304.ts
#EXTINF:2.000000,
#EXT-X-PROGRAM-DATE-TIME:2024-10-09T12:56:39.019+0200
test_0_0_0305.ts
#EXTINF:2.000000,
#EXT-X-PROGRAM-DATE-TIME:2024-10-09T12:56:41.019+0200
test_0_0_0306.ts
#EXTINF:2.000000,
#EXT-X-PROGRAM-DATE-TIME:2024-10-09T12:56:43.019+0200
test_0_0_0307.ts
#EXTINF:2.000000,
#EXT-X-PROGRAM-DATE-TIME:2024-10-09T12:56:45.019+0200
test_0_0_0308.ts
#EXTINF:2.000000,
#EXT-X-PROGRAM-DATE-TIME:2024-10-09T12:56:47.019+0200
test_0_0_0309.ts
#EXTINF:2.000000,
#EXT-X-PROGRAM-DATE-TIME:2024-10-09T12:56:49.019+0200
test_0_0_0310.ts

View File

@@ -6,6 +6,7 @@ import (
"net/http"
"strings"
"github.com/datarhei/core/v16/mem"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
)
@@ -31,7 +32,7 @@ func NewHLSRewrite() echo.MiddlewareFunc {
}
type hlsrewrite struct {
pathPrefix string
pathPrefix []byte
}
func NewHLSRewriteWithConfig(config HLSRewriteConfig) echo.MiddlewareFunc {
@@ -47,7 +48,7 @@ func NewHLSRewriteWithConfig(config HLSRewriteConfig) echo.MiddlewareFunc {
}
hls := hlsrewrite{
pathPrefix: pathPrefix,
pathPrefix: []byte(pathPrefix),
}
return func(next echo.HandlerFunc) echo.HandlerFunc {
@@ -91,6 +92,7 @@ func (h *hlsrewrite) rewrite(c echo.Context, next echo.HandlerFunc) error {
// the data that we need to rewrite.
rewriter = &hlsRewriter{
ResponseWriter: res.Writer,
buffer: mem.Get(),
}
res.Writer = rewriter
@@ -104,16 +106,20 @@ func (h *hlsrewrite) rewrite(c echo.Context, next echo.HandlerFunc) error {
res.Writer = writer
if rewrite {
if res.Status != 200 {
if res.Status == 200 {
// Rewrite the data befor sending it to the client
buffer := mem.Get()
defer mem.Put(buffer)
rewriter.rewrite(h.pathPrefix, buffer)
res.Header().Set("Cache-Control", "private")
res.Write(buffer.Bytes())
} else {
res.Write(rewriter.buffer.Bytes())
return nil
}
// Rewrite the data befor sending it to the client
rewriter.rewrite(h.pathPrefix)
res.Header().Set("Cache-Control", "private")
res.Write(rewriter.buffer.Bytes())
mem.Put(rewriter.buffer)
}
return nil
@@ -121,7 +127,7 @@ func (h *hlsrewrite) rewrite(c echo.Context, next echo.HandlerFunc) error {
type hlsRewriter struct {
http.ResponseWriter
buffer bytes.Buffer
buffer *bytes.Buffer
}
func (g *hlsRewriter) Write(data []byte) (int, error) {
@@ -131,34 +137,29 @@ func (g *hlsRewriter) Write(data []byte) (int, error) {
return w, err
}
func (g *hlsRewriter) rewrite(pathPrefix string) {
var buffer bytes.Buffer
func (g *hlsRewriter) rewrite(pathPrefix []byte, buffer *bytes.Buffer) {
// Find all URLS in the .m3u8 and add the session ID to the query string
scanner := bufio.NewScanner(&g.buffer)
scanner := bufio.NewScanner(g.buffer)
for scanner.Scan() {
line := scanner.Text()
line := scanner.Bytes()
// Write empty lines unmodified
if len(line) == 0 {
buffer.WriteString(line + "\n")
buffer.Write(line)
buffer.WriteByte('\n')
continue
}
// Write comments unmodified
if strings.HasPrefix(line, "#") {
buffer.WriteString(line + "\n")
if line[0] == '#' {
buffer.Write(line)
buffer.WriteByte('\n')
continue
}
// Rewrite
line = strings.TrimPrefix(line, pathPrefix)
buffer.WriteString(line + "\n")
line = bytes.TrimPrefix(line, pathPrefix)
buffer.Write(line)
buffer.WriteByte('\n')
}
if err := scanner.Err(); err != nil {
return
}
g.buffer = buffer
}

View File

@@ -0,0 +1,49 @@
package hlsrewrite
import (
"bytes"
"os"
"testing"
"github.com/stretchr/testify/require"
)
func TestRewrite(t *testing.T) {
data, err := os.ReadFile("./fixtures/data.txt")
require.NoError(t, err)
rewrittendata, err := os.ReadFile("./fixtures/data_rewritten.txt")
require.NoError(t, err)
r := &hlsRewriter{
buffer: &bytes.Buffer{},
}
r.Write(data)
buffer := &bytes.Buffer{}
prefix := []byte("/path/to/foobar/")
r.rewrite(prefix, buffer)
require.Equal(t, rewrittendata, buffer.Bytes())
}
func BenchmarkRewrite(b *testing.B) {
data, err := os.ReadFile("./fixtures/data.txt")
require.NoError(b, err)
r := &hlsRewriter{
buffer: &bytes.Buffer{},
}
buffer := &bytes.Buffer{}
prefix := []byte("/path/to/foobar/")
for i := 0; i < b.N; i++ {
r.buffer.Reset()
r.Write(data)
buffer.Reset()
r.rewrite(prefix, buffer)
}
}

View File

@@ -379,10 +379,12 @@ func (fs *memFilesystem) ReadFile(path string) ([]byte, error) {
}
}
data := make([]byte, file.data.Len())
copy(data, file.data.Bytes())
data := pool.Get()
return data, nil
data.Grow(file.data.Len())
data.Write(file.data.Bytes())
return data.Bytes(), nil
}
func (fs *memFilesystem) Symlink(oldname, newname string) error {

View File

@@ -14,6 +14,7 @@ import (
"github.com/datarhei/core/v16/glob"
"github.com/datarhei/core/v16/log"
"github.com/datarhei/core/v16/mem"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
)
@@ -275,7 +276,7 @@ func (fs *s3Filesystem) ReadFile(path string) ([]byte, error) {
defer file.Close()
buf := &bytes.Buffer{}
buf := mem.Get() // here we take out a buffer for good
_, err := buf.ReadFrom(file)
if err != nil {
@@ -371,11 +372,13 @@ func (fs *s3Filesystem) AppendFileReader(path string, r io.Reader, sizeHint int)
return size, err
}
buffer := bytes.Buffer{}
buffer := mem.Get()
defer mem.Put(buffer)
buffer.ReadFrom(object)
buffer.ReadFrom(r)
size, _, err := fs.write(path, &buffer)
size, _, err := fs.write(path, buffer)
return size, err
}

View File

@@ -31,3 +31,17 @@ func (p *BufferPool) Get() *bytes.Buffer {
func (p *BufferPool) Put(buf *bytes.Buffer) {
p.pool.Put(buf)
}
var DefaultBufferPool *BufferPool
func init() {
DefaultBufferPool = NewBufferPool()
}
func Get() *bytes.Buffer {
return DefaultBufferPool.Get()
}
func Put(buf *bytes.Buffer) {
DefaultBufferPool.Put(buf)
}

View File

@@ -11,6 +11,7 @@ import (
"github.com/datarhei/core/v16/encoding/json"
"github.com/datarhei/core/v16/log"
"github.com/datarhei/core/v16/mem"
)
type API interface {
@@ -227,8 +228,10 @@ func (a *api) call(method, path string, body io.Reader) ([]byte, error) {
}
func (a *api) Monitor(id string, monitordata MonitorData) (MonitorResponse, error) {
var data bytes.Buffer
encoder := json.NewEncoder(&data)
data := mem.Get()
defer mem.Put(data)
encoder := json.NewEncoder(data)
if err := encoder.Encode(monitordata); err != nil {
return MonitorResponse{}, err
}
@@ -240,7 +243,7 @@ func (a *api) Monitor(id string, monitordata MonitorData) (MonitorResponse, erro
}
*/
response, err := a.callWithRetry(http.MethodPut, "api/v1/core/monitor/"+id, &data)
response, err := a.callWithRetry(http.MethodPut, "api/v1/core/monitor/"+id, data)
if err != nil {
return MonitorResponse{}, fmt.Errorf("error sending request: %w", err)
}

View File

@@ -196,9 +196,9 @@ func (r *registry) sessionPersister(pattern *strftime.Strftime, bufferDuration t
"buffer": bufferDuration,
}).Log("Session persister started")
buffer := &bytes.Buffer{}
path := pattern.FormatString(time.Now())
buffer := &bytes.Buffer{}
enc := json.NewEncoder(buffer)
ticker := time.NewTicker(bufferDuration)