improve write performance by

* buffering packets before sending them
* removing mutexes
This commit is contained in:
aler9
2021-12-08 17:46:56 +01:00
parent a1de5ffdf9
commit f3096ec102
20 changed files with 969 additions and 952 deletions

View File

@@ -2,6 +2,7 @@ package base
import (
"bufio"
"bytes"
"fmt"
"io"
"strconv"
@@ -35,15 +36,10 @@ func (b *body) read(header Header, rb *bufio.Reader) error {
return nil
}
func (b body) write(bw *bufio.Writer) error {
func (b body) write(bb *bytes.Buffer) {
if len(b) == 0 {
return nil
return
}
_, err := bw.Write(b)
if err != nil {
return err
}
return nil
bb.Write(b)
}

View File

@@ -3,25 +3,11 @@ package base
import (
"bufio"
"bytes"
"fmt"
"testing"
"github.com/stretchr/testify/require"
)
type limitedBuffer struct {
cap int
n int
}
func (b *limitedBuffer) Write(p []byte) (int, error) {
b.n += len(p)
if b.n > b.cap {
return 0, fmt.Errorf("capacity reached")
}
return len(p), nil
}
var casesBody = []struct {
name string
h Header
@@ -52,19 +38,6 @@ func TestBodyRead(t *testing.T) {
}
}
func TestBodyWrite(t *testing.T) {
for _, ca := range casesBody {
t.Run(ca.name, func(t *testing.T) {
var buf bytes.Buffer
bw := bufio.NewWriter(&buf)
err := body(ca.byts).write(bw)
require.NoError(t, err)
bw.Flush()
require.Equal(t, ca.byts, buf.Bytes())
})
}
}
func TestBodyReadErrors(t *testing.T) {
for _, ca := range []struct {
name string
@@ -105,8 +78,12 @@ func TestBodyReadErrors(t *testing.T) {
}
}
func TestBodyWriteErrors(t *testing.T) {
bw := bufio.NewWriterSize(&limitedBuffer{cap: 3}, 1)
err := body([]byte("1234")).write(bw)
require.EqualError(t, err, "capacity reached")
func TestBodyWrite(t *testing.T) {
for _, ca := range casesBody {
t.Run(ca.name, func(t *testing.T) {
var buf bytes.Buffer
body(ca.byts).write(&buf)
require.Equal(t, ca.byts, buf.Bytes())
})
}
}

View File

@@ -2,6 +2,7 @@ package base
import (
"bufio"
"bytes"
"fmt"
"net/http"
"sort"
@@ -97,7 +98,7 @@ func (h *Header) read(rb *bufio.Reader) error {
return nil
}
func (h Header) write(wb *bufio.Writer) error {
func (h Header) write(bb *bytes.Buffer) {
// sort headers by key
// in order to obtain deterministic results
keys := make([]string, len(h))
@@ -108,17 +109,9 @@ func (h Header) write(wb *bufio.Writer) error {
for _, key := range keys {
for _, val := range h[key] {
_, err := wb.Write([]byte(key + ": " + val + "\r\n"))
if err != nil {
return err
}
bb.Write([]byte(key + ": " + val + "\r\n"))
}
}
_, err := wb.Write([]byte("\r\n"))
if err != nil {
return err
}
return nil
bb.Write([]byte("\r\n"))
}

View File

@@ -116,19 +116,6 @@ func TestHeaderRead(t *testing.T) {
}
}
func TestHeaderWrite(t *testing.T) {
for _, ca := range casesHeader {
t.Run(ca.name, func(t *testing.T) {
var buf bytes.Buffer
bw := bufio.NewWriter(&buf)
err := ca.header.write(bw)
require.NoError(t, err)
bw.Flush()
require.Equal(t, ca.enc, buf.Bytes())
})
}
}
func TestHeaderReadErrors(t *testing.T) {
for _, ca := range []struct {
name string
@@ -186,26 +173,12 @@ func TestHeaderReadErrors(t *testing.T) {
}
}
func TestHeaderWriteErrors(t *testing.T) {
for _, ca := range []struct {
name string
cap int
}{
{
"values",
3,
},
{
"final newline",
12,
},
} {
func TestHeaderWrite(t *testing.T) {
for _, ca := range casesHeader {
t.Run(ca.name, func(t *testing.T) {
bw := bufio.NewWriterSize(&limitedBuffer{cap: ca.cap}, 1)
err := Header{
"Value": HeaderValue{"key"},
}.write(bw)
require.EqualError(t, err, "capacity reached")
var buf bytes.Buffer
ca.header.write(&buf)
require.Equal(t, ca.enc, buf.Bytes())
})
}
}

View File

@@ -2,6 +2,7 @@ package base
import (
"bufio"
"bytes"
"encoding/binary"
"fmt"
"io"
@@ -96,19 +97,13 @@ func (f *InterleavedFrame) Read(br *bufio.Reader) error {
}
// Write writes an InterleavedFrame into a buffered writer.
func (f InterleavedFrame) Write(bw *bufio.Writer) error {
func (f InterleavedFrame) Write(bb *bytes.Buffer) {
bb.Reset()
buf := []byte{0x24, byte(f.Channel), 0x00, 0x00}
binary.BigEndian.PutUint16(buf[2:], uint16(len(f.Payload)))
_, err := bw.Write(buf)
if err != nil {
return err
}
bb.Write(buf)
_, err = bw.Write(f.Payload)
if err != nil {
return err
}
return bw.Flush()
bb.Write(f.Payload)
}

View File

@@ -45,19 +45,6 @@ func TestInterleavedFrameRead(t *testing.T) {
}
}
func TestInterleavedFrameWrite(t *testing.T) {
for _, ca := range casesInterleavedFrame {
t.Run(ca.name, func(t *testing.T) {
var buf bytes.Buffer
bw := bufio.NewWriter(&buf)
err := ca.dec.Write(bw)
require.NoError(t, err)
bw.Flush()
require.Equal(t, ca.enc, buf.Bytes())
})
}
}
func TestInterleavedFrameReadErrors(t *testing.T) {
for _, ca := range []struct {
name string
@@ -94,27 +81,12 @@ func TestInterleavedFrameReadErrors(t *testing.T) {
}
}
func TestInterleavedFrameWriteErrors(t *testing.T) {
for _, ca := range []struct {
name string
cap int
}{
{
"header",
3,
},
{
"content",
6,
},
} {
func TestInterleavedFrameWrite(t *testing.T) {
for _, ca := range casesInterleavedFrame {
t.Run(ca.name, func(t *testing.T) {
bw := bufio.NewWriterSize(&limitedBuffer{cap: ca.cap}, 1)
err := InterleavedFrame{
Channel: 3,
Payload: []byte{0x01, 0x02, 0x03, 0x04},
}.Write(bw)
require.EqualError(t, err, "capacity reached")
var buf bytes.Buffer
ca.dec.Write(&buf)
require.Equal(t, ca.enc, buf.Bytes())
})
}
}

View File

@@ -121,33 +121,24 @@ func (req *Request) ReadIgnoreFrames(rb *bufio.Reader, buf []byte) error {
}
// Write writes a request.
func (req Request) Write(bw *bufio.Writer) error {
func (req Request) Write(bb *bytes.Buffer) {
bb.Reset()
urStr := req.URL.CloneWithoutCredentials().String()
_, err := bw.Write([]byte(string(req.Method) + " " + urStr + " " + rtspProtocol10 + "\r\n"))
if err != nil {
return err
}
bb.Write([]byte(string(req.Method) + " " + urStr + " " + rtspProtocol10 + "\r\n"))
if len(req.Body) != 0 {
req.Header["Content-Length"] = HeaderValue{strconv.FormatInt(int64(len(req.Body)), 10)}
}
err = req.Header.write(bw)
if err != nil {
return err
}
req.Header.write(bb)
err = body(req.Body).write(bw)
if err != nil {
return err
}
return bw.Flush()
body(req.Body).write(bb)
}
// String implements fmt.Stringer.
func (req Request) String() string {
buf := bytes.NewBuffer(nil)
req.Write(bufio.NewWriter(buf))
var buf bytes.Buffer
req.Write(&buf)
return buf.String()
}

View File

@@ -143,19 +143,6 @@ func TestRequestRead(t *testing.T) {
}
}
func TestRequestWrite(t *testing.T) {
for _, ca := range casesRequest {
t.Run(ca.name, func(t *testing.T) {
var buf bytes.Buffer
bw := bufio.NewWriter(&buf)
err := ca.req.Write(bw)
require.NoError(t, err)
// do NOT call flush(), write() must have already done it
require.Equal(t, ca.byts, buf.Bytes())
})
}
}
func TestRequestReadErrors(t *testing.T) {
for _, ca := range []struct {
name string
@@ -231,35 +218,12 @@ func TestRequestReadErrors(t *testing.T) {
}
}
func TestRequestWriteErrors(t *testing.T) {
for _, ca := range []struct {
name string
cap int
}{
{
"first line",
3,
},
{
"header",
53,
},
{
"body",
80,
},
} {
func TestRequestWrite(t *testing.T) {
for _, ca := range casesRequest {
t.Run(ca.name, func(t *testing.T) {
bw := bufio.NewWriterSize(&limitedBuffer{cap: ca.cap}, 1)
err := Request{
Method: "ANNOUNCE",
URL: mustParseURL("rtsp://example.com/media.mp4"),
Header: Header{
"CSeq": HeaderValue{"7"},
},
Body: []byte("abc"),
}.Write(bw)
require.EqualError(t, err, "capacity reached")
var buf bytes.Buffer
ca.req.Write(&buf)
require.Equal(t, ca.byts, buf.Bytes())
})
}
}

View File

@@ -207,40 +207,31 @@ func (res *Response) ReadIgnoreFrames(rb *bufio.Reader, buf []byte) error {
}
// Write writes a Response.
func (res Response) Write(bw *bufio.Writer) error {
func (res Response) Write(bb *bytes.Buffer) {
bb.Reset()
if res.StatusMessage == "" {
if status, ok := statusMessages[res.StatusCode]; ok {
res.StatusMessage = status
}
}
_, err := bw.Write([]byte(rtspProtocol10 + " " +
bb.Write([]byte(rtspProtocol10 + " " +
strconv.FormatInt(int64(res.StatusCode), 10) + " " +
res.StatusMessage + "\r\n"))
if err != nil {
return err
}
if len(res.Body) != 0 {
res.Header["Content-Length"] = HeaderValue{strconv.FormatInt(int64(len(res.Body)), 10)}
}
err = res.Header.write(bw)
if err != nil {
return err
}
res.Header.write(bb)
err = body(res.Body).write(bw)
if err != nil {
return err
}
return bw.Flush()
body(res.Body).write(bb)
}
// String implements fmt.Stringer.
func (res Response) String() string {
buf := bytes.NewBuffer(nil)
res.Write(bufio.NewWriter(buf))
var buf bytes.Buffer
res.Write(&buf)
return buf.String()
}

View File

@@ -105,19 +105,6 @@ func TestResponseRead(t *testing.T) {
}
}
func TestResponseWrite(t *testing.T) {
for _, c := range casesResponse {
t.Run(c.name, func(t *testing.T) {
var buf bytes.Buffer
bw := bufio.NewWriter(&buf)
err := c.res.Write(bw)
require.NoError(t, err)
// do NOT call flush(), write() must have already done it
require.Equal(t, c.byts, buf.Bytes())
})
}
}
func TestResponseReadErrors(t *testing.T) {
for _, ca := range []struct {
name string
@@ -188,35 +175,12 @@ func TestResponseReadErrors(t *testing.T) {
}
}
func TestResponseWriteErrors(t *testing.T) {
for _, ca := range []struct {
name string
cap int
}{
{
"first line",
14,
},
{
"header",
21,
},
{
"body",
49,
},
} {
t.Run(ca.name, func(t *testing.T) {
bw := bufio.NewWriterSize(&limitedBuffer{cap: ca.cap}, 1)
err := Response{
StatusCode: 200,
StatusMessage: "OK",
Header: Header{
"CSeq": HeaderValue{"2"},
},
Body: []byte("abc"),
}.Write(bw)
require.EqualError(t, err, "capacity reached")
func TestResponseWrite(t *testing.T) {
for _, c := range casesResponse {
t.Run(c.name, func(t *testing.T) {
var buf bytes.Buffer
c.res.Write(&buf)
require.Equal(t, c.byts, buf.Bytes())
})
}
}
@@ -244,9 +208,7 @@ func TestResponseWriteAutoFillStatus(t *testing.T) {
)
var buf bytes.Buffer
bw := bufio.NewWriter(&buf)
err := res.Write(bw)
require.NoError(t, err)
res.Write(&buf)
require.Equal(t, byts, buf.Bytes())
}