diff --git a/cluster/raft/raft.go b/cluster/raft/raft.go index e07f1f1e..996345e0 100644 --- a/cluster/raft/raft.go +++ b/cluster/raft/raft.go @@ -1,7 +1,6 @@ package raft import ( - "bytes" "encoding/base64" "fmt" "io" @@ -16,6 +15,7 @@ import ( "github.com/datarhei/core/v16/cluster/store" "github.com/datarhei/core/v16/encoding/json" "github.com/datarhei/core/v16/log" + "github.com/datarhei/core/v16/mem" "go.etcd.io/bbolt" "github.com/hashicorp/go-hclog" @@ -359,14 +359,14 @@ func (r *raft) Snapshot() (io.ReadCloser, error) { Data: base64.StdEncoding.EncodeToString(data), } - buffer := bytes.Buffer{} - enc := json.NewEncoder(&buffer) + buffer := mem.Get() + enc := json.NewEncoder(buffer) err = enc.Encode(snapshot) if err != nil { return nil, err } - return &readCloserWrapper{&buffer}, nil + return &readCloserWrapper{buffer}, nil } func (r *raft) start(fsm hcraft.FSM, peers []Peer, inmem bool) error { diff --git a/http/client/client.go b/http/client/client.go index ab235c0d..ae25779e 100644 --- a/http/client/client.go +++ b/http/client/client.go @@ -1,7 +1,6 @@ package client import ( - "bytes" "context" "fmt" "io" @@ -15,6 +14,7 @@ import ( "github.com/datarhei/core/v16/encoding/json" "github.com/datarhei/core/v16/glob" "github.com/datarhei/core/v16/http/api" + "github.com/datarhei/core/v16/mem" "github.com/datarhei/core/v16/restream/app" "github.com/Masterminds/semver/v3" @@ -199,6 +199,8 @@ type restclient struct { connectedCore *semver.Version methods map[string][]apiconstraint } + + pool *mem.BufferPool } // New returns a new REST API client for the given config. The error is non-nil @@ -212,6 +214,7 @@ func New(config Config) (RestClient, error) { auth0Token: config.Auth0Token, client: config.Client, clientTimeout: config.Timeout, + pool: mem.NewBufferPool(), } if len(config.AccessToken) != 0 { @@ -652,12 +655,13 @@ func (r *restclient) login() error { login.Password = r.password } - var buf bytes.Buffer + buf := r.pool.Get() + defer r.pool.Put(buf) - e := json.NewEncoder(&buf) + e := json.NewEncoder(buf) e.Encode(login) - req, err := http.NewRequest("POST", r.address+r.prefix+"/login", &buf) + req, err := http.NewRequest("POST", r.address+r.prefix+"/login", buf) if err != nil { return err } diff --git a/http/client/events.go b/http/client/events.go index 36e7063b..e2f25279 100644 --- a/http/client/events.go +++ b/http/client/events.go @@ -1,7 +1,6 @@ package client import ( - "bytes" "context" "io" "net/http" @@ -11,15 +10,16 @@ import ( ) func (r *restclient) Events(ctx context.Context, filters api.EventFilters) (<-chan api.Event, error) { - var buf bytes.Buffer + buf := r.pool.Get() + defer r.pool.Put(buf) - e := json.NewEncoder(&buf) + e := json.NewEncoder(buf) e.Encode(filters) header := make(http.Header) header.Set("Accept", "application/x-json-stream") - stream, err := r.stream(ctx, "POST", "/v3/events", nil, header, "application/json", &buf) + stream, err := r.stream(ctx, "POST", "/v3/events", nil, header, "application/json", buf) if err != nil { return nil, err } diff --git a/http/client/process.go b/http/client/process.go index 38a41d18..485ab313 100644 --- a/http/client/process.go +++ b/http/client/process.go @@ -1,7 +1,6 @@ package client import ( - "bytes" "net/url" "strings" @@ -66,15 +65,16 @@ func (r *restclient) Process(id app.ProcessID, filter []string) (api.Process, er } func (r *restclient) ProcessAdd(p *app.Config, metadata map[string]interface{}) error { - var buf bytes.Buffer + buf := r.pool.Get() + defer r.pool.Put(buf) config := api.ProcessConfig{} config.Unmarshal(p, metadata) - e := json.NewEncoder(&buf) + e := json.NewEncoder(buf) e.Encode(config) - _, err := r.call("POST", "/v3/process", nil, nil, "application/json", &buf) + _, err := r.call("POST", "/v3/process", nil, nil, "application/json", buf) if err != nil { return err } @@ -83,18 +83,19 @@ func (r *restclient) ProcessAdd(p *app.Config, metadata map[string]interface{}) } func (r *restclient) ProcessUpdate(id app.ProcessID, p *app.Config, metadata map[string]interface{}) error { - var buf bytes.Buffer + buf := r.pool.Get() + defer r.pool.Put(buf) config := api.ProcessConfig{} config.Unmarshal(p, metadata) - e := json.NewEncoder(&buf) + e := json.NewEncoder(buf) e.Encode(config) query := &url.Values{} query.Set("domain", id.Domain) - _, err := r.call("PUT", "/v3/process/"+url.PathEscape(id.ID), query, nil, "application/json", &buf) + _, err := r.call("PUT", "/v3/process/"+url.PathEscape(id.ID), query, nil, "application/json", buf) if err != nil { return err } @@ -103,18 +104,19 @@ func (r *restclient) ProcessUpdate(id app.ProcessID, p *app.Config, metadata map } func (r *restclient) ProcessReportSet(id app.ProcessID, report *app.Report) error { - var buf bytes.Buffer + buf := r.pool.Get() + defer r.pool.Put(buf) data := api.ProcessReport{} data.Unmarshal(report) - e := json.NewEncoder(&buf) + e := json.NewEncoder(buf) e.Encode(data) query := &url.Values{} query.Set("domain", id.Domain) - _, err := r.call("PUT", "/v3/process/"+url.PathEscape(id.ID)+"/report", query, nil, "application/json", &buf) + _, err := r.call("PUT", "/v3/process/"+url.PathEscape(id.ID)+"/report", query, nil, "application/json", buf) if err != nil { return err } @@ -132,9 +134,10 @@ func (r *restclient) ProcessDelete(id app.ProcessID) error { } func (r *restclient) ProcessCommand(id app.ProcessID, command string) error { - var buf bytes.Buffer + buf := r.pool.Get() + defer r.pool.Put(buf) - e := json.NewEncoder(&buf) + e := json.NewEncoder(buf) e.Encode(api.Command{ Command: command, }) @@ -142,7 +145,7 @@ func (r *restclient) ProcessCommand(id app.ProcessID, command string) error { query := &url.Values{} query.Set("domain", id.Domain) - _, err := r.call("PUT", "/v3/process/"+url.PathEscape(id.ID)+"/command", query, nil, "application/json", &buf) + _, err := r.call("PUT", "/v3/process/"+url.PathEscape(id.ID)+"/command", query, nil, "application/json", buf) return err } @@ -170,15 +173,16 @@ func (r *restclient) ProcessMetadata(id app.ProcessID, key string) (api.Metadata } func (r *restclient) ProcessMetadataSet(id app.ProcessID, key string, metadata api.Metadata) error { - var buf bytes.Buffer + buf := r.pool.Get() + defer r.pool.Put(buf) - e := json.NewEncoder(&buf) + e := json.NewEncoder(buf) e.Encode(metadata) query := &url.Values{} query.Set("domain", id.Domain) - _, err := r.call("PUT", "/v3/process/"+url.PathEscape(id.ID)+"/metadata/"+url.PathEscape(key), query, nil, "application/json", &buf) + _, err := r.call("PUT", "/v3/process/"+url.PathEscape(id.ID)+"/metadata/"+url.PathEscape(key), query, nil, "application/json", buf) return err } @@ -201,15 +205,17 @@ func (r *restclient) ProcessProbe(id app.ProcessID) (api.Probe, error) { func (r *restclient) ProcessProbeConfig(p *app.Config) (api.Probe, error) { var probe api.Probe - var buf bytes.Buffer + + buf := r.pool.Get() + defer r.pool.Put(buf) config := api.ProcessConfig{} config.Unmarshal(p, nil) - e := json.NewEncoder(&buf) + e := json.NewEncoder(buf) e.Encode(config) - data, err := r.call("POST", "/v3/process/probe", nil, nil, "application/json", &buf) + data, err := r.call("POST", "/v3/process/probe", nil, nil, "application/json", buf) if err != nil { return probe, err } diff --git a/http/middleware/hlsrewrite/fixtures/data.txt b/http/middleware/hlsrewrite/fixtures/data.txt new file mode 100644 index 00000000..78bccdda --- /dev/null +++ b/http/middleware/hlsrewrite/fixtures/data.txt @@ -0,0 +1,29 @@ +#EXTM3U +#EXT-X-VERSION:6 +#EXT-X-TARGETDURATION:2 +#EXT-X-MEDIA-SEQUENCE:303 +#EXT-X-INDEPENDENT-SEGMENTS +#EXTINF:2.000000, +#EXT-X-PROGRAM-DATE-TIME:2024-10-09T12:56:35.019+0200 +/path/to/foobar/test_0_0_0303.ts +#EXTINF:2.000000, +#EXT-X-PROGRAM-DATE-TIME:2024-10-09T12:56:37.019+0200 +/path/to/foobar/test_0_0_0304.ts +#EXTINF:2.000000, +#EXT-X-PROGRAM-DATE-TIME:2024-10-09T12:56:39.019+0200 +/path/to/foobar/test_0_0_0305.ts +#EXTINF:2.000000, +#EXT-X-PROGRAM-DATE-TIME:2024-10-09T12:56:41.019+0200 +/path/to/foobar/test_0_0_0306.ts +#EXTINF:2.000000, +#EXT-X-PROGRAM-DATE-TIME:2024-10-09T12:56:43.019+0200 +/path/to/foobar/test_0_0_0307.ts +#EXTINF:2.000000, +#EXT-X-PROGRAM-DATE-TIME:2024-10-09T12:56:45.019+0200 +/path/to/foobar/test_0_0_0308.ts +#EXTINF:2.000000, +#EXT-X-PROGRAM-DATE-TIME:2024-10-09T12:56:47.019+0200 +/path/to/foobar/test_0_0_0309.ts +#EXTINF:2.000000, +#EXT-X-PROGRAM-DATE-TIME:2024-10-09T12:56:49.019+0200 +/path/to/foobar/test_0_0_0310.ts diff --git a/http/middleware/hlsrewrite/fixtures/data_rewritten.txt b/http/middleware/hlsrewrite/fixtures/data_rewritten.txt new file mode 100644 index 00000000..a4e2348c --- /dev/null +++ b/http/middleware/hlsrewrite/fixtures/data_rewritten.txt @@ -0,0 +1,29 @@ +#EXTM3U +#EXT-X-VERSION:6 +#EXT-X-TARGETDURATION:2 +#EXT-X-MEDIA-SEQUENCE:303 +#EXT-X-INDEPENDENT-SEGMENTS +#EXTINF:2.000000, +#EXT-X-PROGRAM-DATE-TIME:2024-10-09T12:56:35.019+0200 +test_0_0_0303.ts +#EXTINF:2.000000, +#EXT-X-PROGRAM-DATE-TIME:2024-10-09T12:56:37.019+0200 +test_0_0_0304.ts +#EXTINF:2.000000, +#EXT-X-PROGRAM-DATE-TIME:2024-10-09T12:56:39.019+0200 +test_0_0_0305.ts +#EXTINF:2.000000, +#EXT-X-PROGRAM-DATE-TIME:2024-10-09T12:56:41.019+0200 +test_0_0_0306.ts +#EXTINF:2.000000, +#EXT-X-PROGRAM-DATE-TIME:2024-10-09T12:56:43.019+0200 +test_0_0_0307.ts +#EXTINF:2.000000, +#EXT-X-PROGRAM-DATE-TIME:2024-10-09T12:56:45.019+0200 +test_0_0_0308.ts +#EXTINF:2.000000, +#EXT-X-PROGRAM-DATE-TIME:2024-10-09T12:56:47.019+0200 +test_0_0_0309.ts +#EXTINF:2.000000, +#EXT-X-PROGRAM-DATE-TIME:2024-10-09T12:56:49.019+0200 +test_0_0_0310.ts diff --git a/http/middleware/hlsrewrite/hlsrewrite.go b/http/middleware/hlsrewrite/hlsrewrite.go index 674228bf..02f21f2c 100644 --- a/http/middleware/hlsrewrite/hlsrewrite.go +++ b/http/middleware/hlsrewrite/hlsrewrite.go @@ -6,6 +6,7 @@ import ( "net/http" "strings" + "github.com/datarhei/core/v16/mem" "github.com/labstack/echo/v4" "github.com/labstack/echo/v4/middleware" ) @@ -31,7 +32,7 @@ func NewHLSRewrite() echo.MiddlewareFunc { } type hlsrewrite struct { - pathPrefix string + pathPrefix []byte } func NewHLSRewriteWithConfig(config HLSRewriteConfig) echo.MiddlewareFunc { @@ -47,7 +48,7 @@ func NewHLSRewriteWithConfig(config HLSRewriteConfig) echo.MiddlewareFunc { } hls := hlsrewrite{ - pathPrefix: pathPrefix, + pathPrefix: []byte(pathPrefix), } return func(next echo.HandlerFunc) echo.HandlerFunc { @@ -91,6 +92,7 @@ func (h *hlsrewrite) rewrite(c echo.Context, next echo.HandlerFunc) error { // the data that we need to rewrite. rewriter = &hlsRewriter{ ResponseWriter: res.Writer, + buffer: mem.Get(), } res.Writer = rewriter @@ -104,16 +106,20 @@ func (h *hlsrewrite) rewrite(c echo.Context, next echo.HandlerFunc) error { res.Writer = writer if rewrite { - if res.Status != 200 { + if res.Status == 200 { + // Rewrite the data befor sending it to the client + buffer := mem.Get() + defer mem.Put(buffer) + + rewriter.rewrite(h.pathPrefix, buffer) + + res.Header().Set("Cache-Control", "private") + res.Write(buffer.Bytes()) + } else { res.Write(rewriter.buffer.Bytes()) - return nil } - // Rewrite the data befor sending it to the client - rewriter.rewrite(h.pathPrefix) - - res.Header().Set("Cache-Control", "private") - res.Write(rewriter.buffer.Bytes()) + mem.Put(rewriter.buffer) } return nil @@ -121,7 +127,7 @@ func (h *hlsrewrite) rewrite(c echo.Context, next echo.HandlerFunc) error { type hlsRewriter struct { http.ResponseWriter - buffer bytes.Buffer + buffer *bytes.Buffer } func (g *hlsRewriter) Write(data []byte) (int, error) { @@ -131,34 +137,29 @@ func (g *hlsRewriter) Write(data []byte) (int, error) { return w, err } -func (g *hlsRewriter) rewrite(pathPrefix string) { - var buffer bytes.Buffer - +func (g *hlsRewriter) rewrite(pathPrefix []byte, buffer *bytes.Buffer) { // Find all URLS in the .m3u8 and add the session ID to the query string - scanner := bufio.NewScanner(&g.buffer) + scanner := bufio.NewScanner(g.buffer) for scanner.Scan() { - line := scanner.Text() + line := scanner.Bytes() // Write empty lines unmodified if len(line) == 0 { - buffer.WriteString(line + "\n") + buffer.Write(line) + buffer.WriteByte('\n') continue } // Write comments unmodified - if strings.HasPrefix(line, "#") { - buffer.WriteString(line + "\n") + if line[0] == '#' { + buffer.Write(line) + buffer.WriteByte('\n') continue } // Rewrite - line = strings.TrimPrefix(line, pathPrefix) - buffer.WriteString(line + "\n") + line = bytes.TrimPrefix(line, pathPrefix) + buffer.Write(line) + buffer.WriteByte('\n') } - - if err := scanner.Err(); err != nil { - return - } - - g.buffer = buffer } diff --git a/http/middleware/hlsrewrite/hlsrewrite_test.go b/http/middleware/hlsrewrite/hlsrewrite_test.go new file mode 100644 index 00000000..7fb5341e --- /dev/null +++ b/http/middleware/hlsrewrite/hlsrewrite_test.go @@ -0,0 +1,49 @@ +package hlsrewrite + +import ( + "bytes" + "os" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestRewrite(t *testing.T) { + data, err := os.ReadFile("./fixtures/data.txt") + require.NoError(t, err) + + rewrittendata, err := os.ReadFile("./fixtures/data_rewritten.txt") + require.NoError(t, err) + + r := &hlsRewriter{ + buffer: &bytes.Buffer{}, + } + + r.Write(data) + + buffer := &bytes.Buffer{} + prefix := []byte("/path/to/foobar/") + r.rewrite(prefix, buffer) + + require.Equal(t, rewrittendata, buffer.Bytes()) +} + +func BenchmarkRewrite(b *testing.B) { + data, err := os.ReadFile("./fixtures/data.txt") + require.NoError(b, err) + + r := &hlsRewriter{ + buffer: &bytes.Buffer{}, + } + + buffer := &bytes.Buffer{} + prefix := []byte("/path/to/foobar/") + + for i := 0; i < b.N; i++ { + r.buffer.Reset() + r.Write(data) + + buffer.Reset() + r.rewrite(prefix, buffer) + } +} diff --git a/io/fs/mem.go b/io/fs/mem.go index ba102ed1..2eb68410 100644 --- a/io/fs/mem.go +++ b/io/fs/mem.go @@ -379,10 +379,12 @@ func (fs *memFilesystem) ReadFile(path string) ([]byte, error) { } } - data := make([]byte, file.data.Len()) - copy(data, file.data.Bytes()) + data := pool.Get() - return data, nil + data.Grow(file.data.Len()) + data.Write(file.data.Bytes()) + + return data.Bytes(), nil } func (fs *memFilesystem) Symlink(oldname, newname string) error { diff --git a/io/fs/s3.go b/io/fs/s3.go index ed9d1ade..c1659aef 100644 --- a/io/fs/s3.go +++ b/io/fs/s3.go @@ -14,6 +14,7 @@ import ( "github.com/datarhei/core/v16/glob" "github.com/datarhei/core/v16/log" + "github.com/datarhei/core/v16/mem" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" ) @@ -275,7 +276,7 @@ func (fs *s3Filesystem) ReadFile(path string) ([]byte, error) { defer file.Close() - buf := &bytes.Buffer{} + buf := mem.Get() // here we take out a buffer for good _, err := buf.ReadFrom(file) if err != nil { @@ -371,11 +372,13 @@ func (fs *s3Filesystem) AppendFileReader(path string, r io.Reader, sizeHint int) return size, err } - buffer := bytes.Buffer{} + buffer := mem.Get() + defer mem.Put(buffer) + buffer.ReadFrom(object) buffer.ReadFrom(r) - size, _, err := fs.write(path, &buffer) + size, _, err := fs.write(path, buffer) return size, err } diff --git a/mem/buffer.go b/mem/buffer.go index bcadc871..af676c10 100644 --- a/mem/buffer.go +++ b/mem/buffer.go @@ -31,3 +31,17 @@ func (p *BufferPool) Get() *bytes.Buffer { func (p *BufferPool) Put(buf *bytes.Buffer) { p.pool.Put(buf) } + +var DefaultBufferPool *BufferPool + +func init() { + DefaultBufferPool = NewBufferPool() +} + +func Get() *bytes.Buffer { + return DefaultBufferPool.Get() +} + +func Put(buf *bytes.Buffer) { + DefaultBufferPool.Put(buf) +} diff --git a/service/api/api.go b/service/api/api.go index 0ec47d86..bae2cbb1 100644 --- a/service/api/api.go +++ b/service/api/api.go @@ -11,6 +11,7 @@ import ( "github.com/datarhei/core/v16/encoding/json" "github.com/datarhei/core/v16/log" + "github.com/datarhei/core/v16/mem" ) type API interface { @@ -227,8 +228,10 @@ func (a *api) call(method, path string, body io.Reader) ([]byte, error) { } func (a *api) Monitor(id string, monitordata MonitorData) (MonitorResponse, error) { - var data bytes.Buffer - encoder := json.NewEncoder(&data) + data := mem.Get() + defer mem.Put(data) + + encoder := json.NewEncoder(data) if err := encoder.Encode(monitordata); err != nil { return MonitorResponse{}, err } @@ -240,7 +243,7 @@ func (a *api) Monitor(id string, monitordata MonitorData) (MonitorResponse, erro } */ - response, err := a.callWithRetry(http.MethodPut, "api/v1/core/monitor/"+id, &data) + response, err := a.callWithRetry(http.MethodPut, "api/v1/core/monitor/"+id, data) if err != nil { return MonitorResponse{}, fmt.Errorf("error sending request: %w", err) } diff --git a/session/registry.go b/session/registry.go index 1a08b3f0..8a454013 100644 --- a/session/registry.go +++ b/session/registry.go @@ -196,9 +196,9 @@ func (r *registry) sessionPersister(pattern *strftime.Strftime, bufferDuration t "buffer": bufferDuration, }).Log("Session persister started") - buffer := &bytes.Buffer{} path := pattern.FormatString(time.Now()) + buffer := &bytes.Buffer{} enc := json.NewEncoder(buffer) ticker := time.NewTicker(bufferDuration)