mirror of
https://github.com/libp2p/go-libp2p.git
synced 2025-09-26 20:21:26 +08:00
webrtc: support receiving 256kB messages (#3255)
In experiments with js we've found that increasing the message size increases throughput. See: libp2p/specs#628 (comment) for details. This changes the protobuf reader for the stream to read 256kB messages. This also forces a change to the connection SCTP read buffer to be increased to about 2.5 MB, to support 1 message being buffered for 10 streams. This isn't enough to support larger messages. We most likely need to change the inferred SDP of the server to use 256kB maxMessageSize, and need some backwards compatible mechanism in the handshake to opt in to large messages. See: libp2p/specs#628 for details
This commit is contained in:
10
go.mod
10
go.mod
@@ -46,11 +46,11 @@ require (
|
||||
github.com/multiformats/go-varint v0.0.7
|
||||
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58
|
||||
github.com/pion/datachannel v1.5.10
|
||||
github.com/pion/ice/v4 v4.0.6
|
||||
github.com/pion/ice/v4 v4.0.8
|
||||
github.com/pion/logging v0.2.3
|
||||
github.com/pion/sctp v1.8.36
|
||||
github.com/pion/sctp v1.8.37
|
||||
github.com/pion/stun v0.6.1
|
||||
github.com/pion/webrtc/v4 v4.0.10
|
||||
github.com/pion/webrtc/v4 v4.0.14
|
||||
github.com/prometheus/client_golang v1.21.0
|
||||
github.com/prometheus/client_model v0.6.1
|
||||
github.com/quic-go/quic-go v0.50.0
|
||||
@@ -89,8 +89,8 @@ require (
|
||||
github.com/pion/mdns/v2 v2.0.7 // indirect
|
||||
github.com/pion/randutil v0.1.0 // indirect
|
||||
github.com/pion/rtcp v1.2.15 // indirect
|
||||
github.com/pion/rtp v1.8.11 // indirect
|
||||
github.com/pion/sdp/v3 v3.0.10 // indirect
|
||||
github.com/pion/rtp v1.8.13 // indirect
|
||||
github.com/pion/sdp/v3 v3.0.11 // indirect
|
||||
github.com/pion/srtp/v3 v3.0.4 // indirect
|
||||
github.com/pion/stun/v3 v3.0.0 // indirect
|
||||
github.com/pion/transport/v2 v2.2.10 // indirect
|
||||
|
20
go.sum
20
go.sum
@@ -194,8 +194,8 @@ github.com/pion/dtls/v2 v2.2.12 h1:KP7H5/c1EiVAAKUmXyCzPiQe5+bCJrpOeKg/L05dunk=
|
||||
github.com/pion/dtls/v2 v2.2.12/go.mod h1:d9SYc9fch0CqK90mRk1dC7AkzzpwJj6u2GU3u+9pqFE=
|
||||
github.com/pion/dtls/v3 v3.0.4 h1:44CZekewMzfrn9pmGrj5BNnTMDCFwr+6sLH+cCuLM7U=
|
||||
github.com/pion/dtls/v3 v3.0.4/go.mod h1:R373CsjxWqNPf6MEkfdy3aSe9niZvL/JaKlGeFphtMg=
|
||||
github.com/pion/ice/v4 v4.0.6 h1:jmM9HwI9lfetQV/39uD0nY4y++XZNPhvzIPCb8EwxUM=
|
||||
github.com/pion/ice/v4 v4.0.6/go.mod h1:y3M18aPhIxLlcO/4dn9X8LzLLSma84cx6emMSu14FGw=
|
||||
github.com/pion/ice/v4 v4.0.8 h1:ajNx0idNG+S+v9Phu4LSn2cs8JEfTsA1/tEjkkAVpFY=
|
||||
github.com/pion/ice/v4 v4.0.8/go.mod h1:y3M18aPhIxLlcO/4dn9X8LzLLSma84cx6emMSu14FGw=
|
||||
github.com/pion/interceptor v0.1.37 h1:aRA8Zpab/wE7/c0O3fh1PqY0AJI3fCSEM5lRWJVorwI=
|
||||
github.com/pion/interceptor v0.1.37/go.mod h1:JzxbJ4umVTlZAf+/utHzNesY8tmRkM2lVmkS82TTj8Y=
|
||||
github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms=
|
||||
@@ -207,12 +207,12 @@ github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA=
|
||||
github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8=
|
||||
github.com/pion/rtcp v1.2.15 h1:LZQi2JbdipLOj4eBjK4wlVoQWfrZbh3Q6eHtWtJBZBo=
|
||||
github.com/pion/rtcp v1.2.15/go.mod h1:jlGuAjHMEXwMUHK78RgX0UmEJFV4zUKOFHR7OP+D3D0=
|
||||
github.com/pion/rtp v1.8.11 h1:17xjnY5WO5hgO6SD3/NTIUPvSFw/PbLsIJyz1r1yNIk=
|
||||
github.com/pion/rtp v1.8.11/go.mod h1:8uMBJj32Pa1wwx8Fuv/AsFhn8jsgw+3rUC2PfoBZ8p4=
|
||||
github.com/pion/sctp v1.8.36 h1:owNudmnz1xmhfYje5L/FCav3V9wpPRePHle3Zi+P+M0=
|
||||
github.com/pion/sctp v1.8.36/go.mod h1:cNiLdchXra8fHQwmIoqw0MbLLMs+f7uQ+dGMG2gWebE=
|
||||
github.com/pion/sdp/v3 v3.0.10 h1:6MChLE/1xYB+CjumMw+gZ9ufp2DPApuVSnDT8t5MIgA=
|
||||
github.com/pion/sdp/v3 v3.0.10/go.mod h1:88GMahN5xnScv1hIMTqLdu/cOcUkj6a9ytbncwMCq2E=
|
||||
github.com/pion/rtp v1.8.13 h1:8uSUPpjSL4OlwZI8Ygqu7+h2p9NPFB+yAZ461Xn5sNg=
|
||||
github.com/pion/rtp v1.8.13/go.mod h1:8uMBJj32Pa1wwx8Fuv/AsFhn8jsgw+3rUC2PfoBZ8p4=
|
||||
github.com/pion/sctp v1.8.37 h1:ZDmGPtRPX9mKCiVXtMbTWybFw3z/hVKAZgU81wcOrqs=
|
||||
github.com/pion/sctp v1.8.37/go.mod h1:cNiLdchXra8fHQwmIoqw0MbLLMs+f7uQ+dGMG2gWebE=
|
||||
github.com/pion/sdp/v3 v3.0.11 h1:VhgVSopdsBKwhCFoyyPmT1fKMeV9nLMrEKxNOdy3IVI=
|
||||
github.com/pion/sdp/v3 v3.0.11/go.mod h1:88GMahN5xnScv1hIMTqLdu/cOcUkj6a9ytbncwMCq2E=
|
||||
github.com/pion/srtp/v3 v3.0.4 h1:2Z6vDVxzrX3UHEgrUyIGM4rRouoC7v+NiF1IHtp9B5M=
|
||||
github.com/pion/srtp/v3 v3.0.4/go.mod h1:1Jx3FwDoxpRaTh1oRV8A/6G1BnFL+QI82eK4ms8EEJQ=
|
||||
github.com/pion/stun v0.6.1 h1:8lp6YejULeHBF8NmV8e2787BogQhduZugh5PdhDyyN4=
|
||||
@@ -227,8 +227,8 @@ github.com/pion/transport/v3 v3.0.7 h1:iRbMH05BzSNwhILHoBoAPxoB9xQgOaJk+591KC9P1
|
||||
github.com/pion/transport/v3 v3.0.7/go.mod h1:YleKiTZ4vqNxVwh77Z0zytYi7rXHl7j6uPLGhhz9rwo=
|
||||
github.com/pion/turn/v4 v4.0.0 h1:qxplo3Rxa9Yg1xXDxxH8xaqcyGUtbHYw4QSCvmFWvhM=
|
||||
github.com/pion/turn/v4 v4.0.0/go.mod h1:MuPDkm15nYSklKpN8vWJ9W2M0PlyQZqYt1McGuxG7mA=
|
||||
github.com/pion/webrtc/v4 v4.0.10 h1:Hq/JLjhqLxi+NmCtE8lnRPDr8H4LcNvwg8OxVcdv56Q=
|
||||
github.com/pion/webrtc/v4 v4.0.10/go.mod h1:ViHLVaNpiuvaH8pdiuQxuA9awuE6KVzAXx3vVWilOck=
|
||||
github.com/pion/webrtc/v4 v4.0.14 h1:nyds/sFRR+HvmWoBa6wrL46sSfpArE0qR883MBW96lg=
|
||||
github.com/pion/webrtc/v4 v4.0.14/go.mod h1:R3+qTnQTS03UzwDarYecgioNf7DYgTsldxnCXB821Kk=
|
||||
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
|
@@ -188,7 +188,7 @@ func (c *connection) OpenStream(ctx context.Context) (network.MuxedStream, error
|
||||
dc.Close()
|
||||
return nil, fmt.Errorf("detach channel failed for stream(%d): %w", streamID, err)
|
||||
}
|
||||
str := newStream(dc, rwc, func() { c.removeStream(streamID) })
|
||||
str := newStream(dc, rwc, maxSendMessageSize, func() { c.removeStream(streamID) })
|
||||
if err := c.addStream(str); err != nil {
|
||||
str.Reset()
|
||||
return nil, fmt.Errorf("failed to add stream(%d) to connection: %w", streamID, err)
|
||||
@@ -201,7 +201,7 @@ func (c *connection) AcceptStream() (network.MuxedStream, error) {
|
||||
case <-c.ctx.Done():
|
||||
return nil, c.closeErr
|
||||
case dc := <-c.acceptQueue:
|
||||
str := newStream(dc.channel, dc.stream, func() { c.removeStream(*dc.channel.ID()) })
|
||||
str := newStream(dc.channel, dc.stream, maxSendMessageSize, func() { c.removeStream(*dc.channel.ID()) })
|
||||
if err := c.addStream(str); err != nil {
|
||||
str.Reset()
|
||||
return nil, err
|
||||
|
@@ -253,7 +253,7 @@ func (l *listener) setupConnection(
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
handshakeChannel := newStream(w.HandshakeDataChannel, rwc, func() {})
|
||||
handshakeChannel := newStream(w.HandshakeDataChannel, rwc, maxSendMessageSize, nil)
|
||||
// we do not yet know A's peer ID so accept any inbound
|
||||
remotePubKey, err := l.transport.noiseHandshake(ctx, w.PeerConnection, handshakeChannel, "", crypto.SHA256, true)
|
||||
if err != nil {
|
||||
|
@@ -15,22 +15,9 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
// maxMessageSize is the maximum message size of the Protobuf message we send / receive.
|
||||
maxMessageSize = 16384
|
||||
// maxSendBuffer is the maximum data we enqueue on the underlying data channel for writes.
|
||||
// The underlying SCTP layer has an unbounded buffer for writes. We limit the amount enqueued
|
||||
// per stream is limited to avoid a single stream monopolizing the entire connection.
|
||||
maxSendBuffer = 2 * maxMessageSize
|
||||
// sendBufferLowThreshold is the threshold below which we write more data on the underlying
|
||||
// data channel. We want a notification as soon as we can write 1 full sized message.
|
||||
sendBufferLowThreshold = maxSendBuffer - maxMessageSize
|
||||
// maxTotalControlMessagesSize is the maximum total size of all control messages we will
|
||||
// write on this stream.
|
||||
// 4 control messages of size 10 bytes + 10 bytes buffer. This number doesn't need to be
|
||||
// exact. In the worst case, we enqueue these many bytes more in the webrtc peer connection
|
||||
// send queue.
|
||||
maxTotalControlMessagesSize = 50
|
||||
|
||||
// maxSendMessageSize is the maximum message size of the Protobuf message we send / receive.
|
||||
// NOTE: Change `varintOverhead` if you change this.
|
||||
maxSendMessageSize = 16384
|
||||
// Proto overhead assumption is 5 bytes
|
||||
protoOverhead = 5
|
||||
// Varint overhead is assumed to be 2 bytes. This is safe since
|
||||
@@ -40,9 +27,20 @@ const (
|
||||
// is less than or equal to 2 ^ 14, the varint will not be more than
|
||||
// 2 bytes in length.
|
||||
varintOverhead = 2
|
||||
|
||||
// maxTotalControlMessagesSize is the maximum total size of all control messages we will
|
||||
// write on this stream.
|
||||
// 4 control messages of size 10 bytes + 10 bytes buffer. This number doesn't need to be
|
||||
// exact. In the worst case, we enqueue these many bytes more in the webrtc peer connection
|
||||
// send queue.
|
||||
maxTotalControlMessagesSize = 50
|
||||
|
||||
// maxFINACKWait is the maximum amount of time a stream will wait to read
|
||||
// FIN_ACK before closing the data channel
|
||||
maxFINACKWait = 10 * time.Second
|
||||
|
||||
// maxReceiveMessageSize is the maximum message size of the Protobuf message we receive.
|
||||
maxReceiveMessageSize = 256<<10 + 1<<10 // 1kB buffer
|
||||
)
|
||||
|
||||
type receiveState uint8
|
||||
@@ -79,11 +77,12 @@ type stream struct {
|
||||
nextMessage *pb.Message
|
||||
receiveState receiveState
|
||||
|
||||
writer pbio.Writer // concurrent writes prevented by mx
|
||||
writeStateChanged chan struct{}
|
||||
sendState sendState
|
||||
writeDeadline time.Time
|
||||
writeError error
|
||||
writer pbio.Writer // concurrent writes prevented by mx
|
||||
writeStateChanged chan struct{}
|
||||
sendState sendState
|
||||
writeDeadline time.Time
|
||||
writeError error
|
||||
maxSendMessageSize int
|
||||
|
||||
controlMessageReaderOnce sync.Once
|
||||
// controlMessageReaderEndTime is the end time for reading FIN_ACK from the control
|
||||
@@ -105,20 +104,21 @@ var _ network.MuxedStream = &stream{}
|
||||
func newStream(
|
||||
channel *webrtc.DataChannel,
|
||||
rwc datachannel.ReadWriteCloser,
|
||||
maxSendMessageSize int,
|
||||
onDone func(),
|
||||
) *stream {
|
||||
s := &stream{
|
||||
reader: pbio.NewDelimitedReader(rwc, maxMessageSize),
|
||||
writer: pbio.NewDelimitedWriter(rwc),
|
||||
writeStateChanged: make(chan struct{}, 1),
|
||||
id: *channel.ID(),
|
||||
dataChannel: rwc.(*datachannel.DataChannel),
|
||||
onDone: onDone,
|
||||
reader: pbio.NewDelimitedReader(rwc, maxReceiveMessageSize),
|
||||
writer: pbio.NewDelimitedWriter(rwc),
|
||||
writeStateChanged: make(chan struct{}, 1),
|
||||
id: *channel.ID(),
|
||||
dataChannel: rwc.(*datachannel.DataChannel),
|
||||
onDone: onDone,
|
||||
maxSendMessageSize: maxSendMessageSize,
|
||||
}
|
||||
s.dataChannel.SetBufferedAmountLowThreshold(sendBufferLowThreshold)
|
||||
s.dataChannel.SetBufferedAmountLowThreshold(uint64(s.sendBufferLowThreshold()))
|
||||
s.dataChannel.OnBufferedAmountLow(func() {
|
||||
s.notifyWriteStateChanged()
|
||||
|
||||
})
|
||||
return s
|
||||
}
|
||||
|
@@ -3,6 +3,7 @@ package libp2pwebrtc
|
||||
import (
|
||||
"crypto/rand"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"sync/atomic"
|
||||
@@ -148,8 +149,8 @@ func TestStreamSimpleReadWriteClose(t *testing.T) {
|
||||
client, server := getDetachedDataChannels(t)
|
||||
|
||||
var clientDone, serverDone atomic.Bool
|
||||
clientStr := newStream(client.dc, client.rwc, func() { clientDone.Store(true) })
|
||||
serverStr := newStream(server.dc, server.rwc, func() { serverDone.Store(true) })
|
||||
clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() { clientDone.Store(true) })
|
||||
serverStr := newStream(server.dc, server.rwc, maxSendMessageSize, func() { serverDone.Store(true) })
|
||||
|
||||
// send a foobar from the client
|
||||
n, err := clientStr.Write([]byte("foobar"))
|
||||
@@ -194,8 +195,8 @@ func TestStreamSimpleReadWriteClose(t *testing.T) {
|
||||
func TestStreamPartialReads(t *testing.T) {
|
||||
client, server := getDetachedDataChannels(t)
|
||||
|
||||
clientStr := newStream(client.dc, client.rwc, func() {})
|
||||
serverStr := newStream(server.dc, server.rwc, func() {})
|
||||
clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() {})
|
||||
serverStr := newStream(server.dc, server.rwc, maxSendMessageSize, func() {})
|
||||
|
||||
_, err := serverStr.Write([]byte("foobar"))
|
||||
require.NoError(t, err)
|
||||
@@ -217,8 +218,8 @@ func TestStreamPartialReads(t *testing.T) {
|
||||
func TestStreamSkipEmptyFrames(t *testing.T) {
|
||||
client, server := getDetachedDataChannels(t)
|
||||
|
||||
clientStr := newStream(client.dc, client.rwc, func() {})
|
||||
serverStr := newStream(server.dc, server.rwc, func() {})
|
||||
clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() {})
|
||||
serverStr := newStream(server.dc, server.rwc, maxSendMessageSize, func() {})
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
require.NoError(t, serverStr.writer.WriteMsg(&pb.Message{}))
|
||||
@@ -252,7 +253,7 @@ func TestStreamSkipEmptyFrames(t *testing.T) {
|
||||
func TestStreamReadReturnsOnClose(t *testing.T) {
|
||||
client, _ := getDetachedDataChannels(t)
|
||||
|
||||
clientStr := newStream(client.dc, client.rwc, func() {})
|
||||
clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() {})
|
||||
errChan := make(chan error, 1)
|
||||
go func() {
|
||||
_, err := clientStr.Read([]byte{0})
|
||||
@@ -275,8 +276,8 @@ func TestStreamResets(t *testing.T) {
|
||||
client, server := getDetachedDataChannels(t)
|
||||
|
||||
var clientDone, serverDone atomic.Bool
|
||||
clientStr := newStream(client.dc, client.rwc, func() { clientDone.Store(true) })
|
||||
serverStr := newStream(server.dc, server.rwc, func() { serverDone.Store(true) })
|
||||
clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() { clientDone.Store(true) })
|
||||
serverStr := newStream(server.dc, server.rwc, maxSendMessageSize, func() { serverDone.Store(true) })
|
||||
|
||||
// send a foobar from the client
|
||||
_, err := clientStr.Write([]byte("foobar"))
|
||||
@@ -311,8 +312,8 @@ func TestStreamResets(t *testing.T) {
|
||||
func TestStreamReadDeadlineAsync(t *testing.T) {
|
||||
client, server := getDetachedDataChannels(t)
|
||||
|
||||
clientStr := newStream(client.dc, client.rwc, func() {})
|
||||
serverStr := newStream(server.dc, server.rwc, func() {})
|
||||
clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() {})
|
||||
serverStr := newStream(server.dc, server.rwc, maxSendMessageSize, func() {})
|
||||
|
||||
timeout := 100 * time.Millisecond
|
||||
if os.Getenv("CI") != "" {
|
||||
@@ -342,8 +343,8 @@ func TestStreamReadDeadlineAsync(t *testing.T) {
|
||||
func TestStreamWriteDeadlineAsync(t *testing.T) {
|
||||
client, server := getDetachedDataChannels(t)
|
||||
|
||||
clientStr := newStream(client.dc, client.rwc, func() {})
|
||||
serverStr := newStream(server.dc, server.rwc, func() {})
|
||||
clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() {})
|
||||
serverStr := newStream(server.dc, server.rwc, maxSendMessageSize, func() {})
|
||||
_ = serverStr
|
||||
|
||||
b := make([]byte, 1024)
|
||||
@@ -372,8 +373,8 @@ func TestStreamWriteDeadlineAsync(t *testing.T) {
|
||||
func TestStreamReadAfterClose(t *testing.T) {
|
||||
client, server := getDetachedDataChannels(t)
|
||||
|
||||
clientStr := newStream(client.dc, client.rwc, func() {})
|
||||
serverStr := newStream(server.dc, server.rwc, func() {})
|
||||
clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() {})
|
||||
serverStr := newStream(server.dc, server.rwc, maxSendMessageSize, func() {})
|
||||
|
||||
serverStr.Close()
|
||||
b := make([]byte, 1)
|
||||
@@ -384,8 +385,8 @@ func TestStreamReadAfterClose(t *testing.T) {
|
||||
|
||||
client, server = getDetachedDataChannels(t)
|
||||
|
||||
clientStr = newStream(client.dc, client.rwc, func() {})
|
||||
serverStr = newStream(server.dc, server.rwc, func() {})
|
||||
clientStr = newStream(client.dc, client.rwc, maxSendMessageSize, func() {})
|
||||
serverStr = newStream(server.dc, server.rwc, maxSendMessageSize, func() {})
|
||||
|
||||
serverStr.Reset()
|
||||
b = make([]byte, 1)
|
||||
@@ -399,8 +400,8 @@ func TestStreamCloseAfterFINACK(t *testing.T) {
|
||||
client, server := getDetachedDataChannels(t)
|
||||
|
||||
done := make(chan bool, 1)
|
||||
clientStr := newStream(client.dc, client.rwc, func() { done <- true })
|
||||
serverStr := newStream(server.dc, server.rwc, func() {})
|
||||
clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() { done <- true })
|
||||
serverStr := newStream(server.dc, server.rwc, maxSendMessageSize, func() {})
|
||||
|
||||
go func() {
|
||||
err := clientStr.Close()
|
||||
@@ -427,8 +428,8 @@ func TestStreamFinAckAfterStopSending(t *testing.T) {
|
||||
client, server := getDetachedDataChannels(t)
|
||||
|
||||
done := make(chan bool, 1)
|
||||
clientStr := newStream(client.dc, client.rwc, func() { done <- true })
|
||||
serverStr := newStream(server.dc, server.rwc, func() {})
|
||||
clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() { done <- true })
|
||||
serverStr := newStream(server.dc, server.rwc, maxSendMessageSize, func() {})
|
||||
|
||||
go func() {
|
||||
clientStr.CloseRead()
|
||||
@@ -460,8 +461,8 @@ func TestStreamConcurrentClose(t *testing.T) {
|
||||
|
||||
start := make(chan bool, 2)
|
||||
done := make(chan bool, 2)
|
||||
clientStr := newStream(client.dc, client.rwc, func() { done <- true })
|
||||
serverStr := newStream(server.dc, server.rwc, func() { done <- true })
|
||||
clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() { done <- true })
|
||||
serverStr := newStream(server.dc, server.rwc, maxSendMessageSize, func() { done <- true })
|
||||
|
||||
go func() {
|
||||
start <- true
|
||||
@@ -495,7 +496,7 @@ func TestStreamResetAfterClose(t *testing.T) {
|
||||
client, server := getDetachedDataChannels(t)
|
||||
|
||||
done := make(chan bool, 2)
|
||||
clientStr := newStream(client.dc, client.rwc, func() { done <- true })
|
||||
clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() { done <- true })
|
||||
clientStr.Close()
|
||||
|
||||
select {
|
||||
@@ -520,7 +521,7 @@ func TestStreamDataChannelCloseOnFINACK(t *testing.T) {
|
||||
client, server := getDetachedDataChannels(t)
|
||||
|
||||
done := make(chan bool, 1)
|
||||
clientStr := newStream(client.dc, client.rwc, func() { done <- true })
|
||||
clientStr := newStream(client.dc, client.rwc, maxSendMessageSize, func() { done <- true })
|
||||
|
||||
clientStr.Close()
|
||||
|
||||
@@ -540,24 +541,35 @@ func TestStreamDataChannelCloseOnFINACK(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestStreamChunking(t *testing.T) {
|
||||
client, server := getDetachedDataChannels(t)
|
||||
for _, msgSize := range []int{16 << 10, 32 << 10, 64 << 10, 128 << 10, 256 << 10} {
|
||||
t.Run(fmt.Sprintf("msgSize=%d", msgSize), func(t *testing.T) {
|
||||
client, server := getDetachedDataChannels(t)
|
||||
defer client.dc.Close()
|
||||
defer server.dc.Close()
|
||||
|
||||
clientStr := newStream(client.dc, client.rwc, func() {})
|
||||
serverStr := newStream(server.dc, server.rwc, func() {})
|
||||
clientStr := newStream(client.dc, client.rwc, msgSize, nil)
|
||||
// server should read large messages even if it can only send 16 kB messages.
|
||||
serverStr := newStream(server.dc, server.rwc, 16<<10, nil)
|
||||
|
||||
const N = (16 << 10) + 1000
|
||||
go func() {
|
||||
data := make([]byte, N)
|
||||
_, err := clientStr.Write(data)
|
||||
require.NoError(t, err)
|
||||
}()
|
||||
N := msgSize + 1000
|
||||
input := make([]byte, N)
|
||||
_, err := rand.Read(input)
|
||||
require.NoError(t, err)
|
||||
go func() {
|
||||
n, err := clientStr.Write(input)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, n, len(input))
|
||||
}()
|
||||
|
||||
data := make([]byte, N)
|
||||
n, err := serverStr.Read(data)
|
||||
require.NoError(t, err)
|
||||
require.LessOrEqual(t, n, 16<<10)
|
||||
|
||||
nn, err := serverStr.Read(data)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, nn+n, N)
|
||||
data := make([]byte, N)
|
||||
n, err := serverStr.Read(data)
|
||||
require.NoError(t, err)
|
||||
require.LessOrEqual(t, n, msgSize)
|
||||
// shouldn't be much less than msgSize
|
||||
require.GreaterOrEqual(t, n, msgSize-100)
|
||||
_, err = serverStr.Read(data[n:])
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, input, data)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@@ -84,7 +84,7 @@ func (s *stream) Write(b []byte) (int, error) {
|
||||
s.mx.Lock()
|
||||
continue
|
||||
}
|
||||
end := maxMessageSize
|
||||
end := s.maxSendMessageSize
|
||||
if end > availableSpace {
|
||||
end = availableSpace
|
||||
}
|
||||
@@ -110,11 +110,24 @@ func (s *stream) SetWriteDeadline(t time.Time) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// sendBufferSize() is the maximum data we enqueue on the underlying data channel for writes.
|
||||
// The underlying SCTP layer has an unbounded buffer for writes. We limit the amount enqueued
|
||||
// per stream is limited to avoid a single stream monopolizing the entire connection.
|
||||
func (s *stream) sendBufferSize() int {
|
||||
return 2 * s.maxSendMessageSize
|
||||
}
|
||||
|
||||
// sendBufferLowThreshold() is the threshold below which we write more data on the underlying
|
||||
// data channel. We want a notification as soon as we can write 1 full sized message.
|
||||
func (s *stream) sendBufferLowThreshold() int {
|
||||
return s.sendBufferSize() - s.maxSendMessageSize
|
||||
}
|
||||
|
||||
func (s *stream) availableSendSpace() int {
|
||||
buffered := int(s.dataChannel.BufferedAmount())
|
||||
availableSpace := maxSendBuffer - buffered
|
||||
availableSpace := s.sendBufferSize() - buffered
|
||||
if availableSpace+maxTotalControlMessagesSize < 0 { // this should never happen, but better check
|
||||
log.Errorw("data channel buffered more data than the maximum amount", "max", maxSendBuffer, "buffered", buffered)
|
||||
log.Errorw("data channel buffered more data than the maximum amount", "max", s.sendBufferSize(), "buffered", buffered)
|
||||
}
|
||||
return availableSpace
|
||||
}
|
||||
|
@@ -68,7 +68,13 @@ const (
|
||||
DefaultFailedTimeout = 30 * time.Second
|
||||
DefaultKeepaliveTimeout = 15 * time.Second
|
||||
|
||||
sctpReceiveBufferSize = 100_000
|
||||
// sctpReceiveBufferSize is the size of the buffer for incoming messages.
|
||||
//
|
||||
// This is enough space for enqueuing 10 full sized messages.
|
||||
// Besides throughput, this only matters if an application is using multiple dependent
|
||||
// streams, say streams 1 & 2. It reads from stream 1 only after receiving message from
|
||||
// stream 2. A buffer of 10 messages should serve all such situations.
|
||||
sctpReceiveBufferSize = 10 * maxReceiveMessageSize
|
||||
)
|
||||
|
||||
type WebRTCTransport struct {
|
||||
@@ -367,7 +373,7 @@ func (t *WebRTCTransport) dial(ctx context.Context, scope network.ConnManagement
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
channel := newStream(w.HandshakeDataChannel, detached, func() {})
|
||||
channel := newStream(w.HandshakeDataChannel, detached, maxSendMessageSize, nil)
|
||||
|
||||
remotePubKey, err := t.noiseHandshake(ctx, w.PeerConnection, channel, p, remoteHashFunction, false)
|
||||
if err != nil {
|
||||
|
@@ -546,7 +546,7 @@ func TestTransportWebRTC_Deadline(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
stream.SetWriteDeadline(time.Now().Add(100 * time.Millisecond))
|
||||
largeBuffer := make([]byte, 2*1024*1024)
|
||||
largeBuffer := make([]byte, 20*1024*1024)
|
||||
_, err = stream.Write(largeBuffer)
|
||||
require.ErrorIs(t, err, os.ErrDeadlineExceeded)
|
||||
|
||||
|
@@ -59,22 +59,22 @@ require (
|
||||
github.com/pion/datachannel v1.5.10 // indirect
|
||||
github.com/pion/dtls/v2 v2.2.12 // indirect
|
||||
github.com/pion/dtls/v3 v3.0.4 // indirect
|
||||
github.com/pion/ice/v4 v4.0.6 // indirect
|
||||
github.com/pion/ice/v4 v4.0.8 // indirect
|
||||
github.com/pion/interceptor v0.1.37 // indirect
|
||||
github.com/pion/logging v0.2.3 // indirect
|
||||
github.com/pion/mdns/v2 v2.0.7 // indirect
|
||||
github.com/pion/randutil v0.1.0 // indirect
|
||||
github.com/pion/rtcp v1.2.15 // indirect
|
||||
github.com/pion/rtp v1.8.11 // indirect
|
||||
github.com/pion/sctp v1.8.36 // indirect
|
||||
github.com/pion/sdp/v3 v3.0.10 // indirect
|
||||
github.com/pion/rtp v1.8.13 // indirect
|
||||
github.com/pion/sctp v1.8.37 // indirect
|
||||
github.com/pion/sdp/v3 v3.0.11 // indirect
|
||||
github.com/pion/srtp/v3 v3.0.4 // indirect
|
||||
github.com/pion/stun v0.6.1 // indirect
|
||||
github.com/pion/stun/v3 v3.0.0 // indirect
|
||||
github.com/pion/transport/v2 v2.2.10 // indirect
|
||||
github.com/pion/transport/v3 v3.0.7 // indirect
|
||||
github.com/pion/turn/v4 v4.0.0 // indirect
|
||||
github.com/pion/webrtc/v4 v4.0.10 // indirect
|
||||
github.com/pion/webrtc/v4 v4.0.14 // indirect
|
||||
github.com/prometheus/client_golang v1.21.0 // indirect
|
||||
github.com/prometheus/client_model v0.6.1 // indirect
|
||||
github.com/prometheus/common v0.62.0 // indirect
|
||||
|
@@ -193,8 +193,8 @@ github.com/pion/dtls/v2 v2.2.12 h1:KP7H5/c1EiVAAKUmXyCzPiQe5+bCJrpOeKg/L05dunk=
|
||||
github.com/pion/dtls/v2 v2.2.12/go.mod h1:d9SYc9fch0CqK90mRk1dC7AkzzpwJj6u2GU3u+9pqFE=
|
||||
github.com/pion/dtls/v3 v3.0.4 h1:44CZekewMzfrn9pmGrj5BNnTMDCFwr+6sLH+cCuLM7U=
|
||||
github.com/pion/dtls/v3 v3.0.4/go.mod h1:R373CsjxWqNPf6MEkfdy3aSe9niZvL/JaKlGeFphtMg=
|
||||
github.com/pion/ice/v4 v4.0.6 h1:jmM9HwI9lfetQV/39uD0nY4y++XZNPhvzIPCb8EwxUM=
|
||||
github.com/pion/ice/v4 v4.0.6/go.mod h1:y3M18aPhIxLlcO/4dn9X8LzLLSma84cx6emMSu14FGw=
|
||||
github.com/pion/ice/v4 v4.0.8 h1:ajNx0idNG+S+v9Phu4LSn2cs8JEfTsA1/tEjkkAVpFY=
|
||||
github.com/pion/ice/v4 v4.0.8/go.mod h1:y3M18aPhIxLlcO/4dn9X8LzLLSma84cx6emMSu14FGw=
|
||||
github.com/pion/interceptor v0.1.37 h1:aRA8Zpab/wE7/c0O3fh1PqY0AJI3fCSEM5lRWJVorwI=
|
||||
github.com/pion/interceptor v0.1.37/go.mod h1:JzxbJ4umVTlZAf+/utHzNesY8tmRkM2lVmkS82TTj8Y=
|
||||
github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms=
|
||||
@@ -206,12 +206,12 @@ github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA=
|
||||
github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8=
|
||||
github.com/pion/rtcp v1.2.15 h1:LZQi2JbdipLOj4eBjK4wlVoQWfrZbh3Q6eHtWtJBZBo=
|
||||
github.com/pion/rtcp v1.2.15/go.mod h1:jlGuAjHMEXwMUHK78RgX0UmEJFV4zUKOFHR7OP+D3D0=
|
||||
github.com/pion/rtp v1.8.11 h1:17xjnY5WO5hgO6SD3/NTIUPvSFw/PbLsIJyz1r1yNIk=
|
||||
github.com/pion/rtp v1.8.11/go.mod h1:8uMBJj32Pa1wwx8Fuv/AsFhn8jsgw+3rUC2PfoBZ8p4=
|
||||
github.com/pion/sctp v1.8.36 h1:owNudmnz1xmhfYje5L/FCav3V9wpPRePHle3Zi+P+M0=
|
||||
github.com/pion/sctp v1.8.36/go.mod h1:cNiLdchXra8fHQwmIoqw0MbLLMs+f7uQ+dGMG2gWebE=
|
||||
github.com/pion/sdp/v3 v3.0.10 h1:6MChLE/1xYB+CjumMw+gZ9ufp2DPApuVSnDT8t5MIgA=
|
||||
github.com/pion/sdp/v3 v3.0.10/go.mod h1:88GMahN5xnScv1hIMTqLdu/cOcUkj6a9ytbncwMCq2E=
|
||||
github.com/pion/rtp v1.8.13 h1:8uSUPpjSL4OlwZI8Ygqu7+h2p9NPFB+yAZ461Xn5sNg=
|
||||
github.com/pion/rtp v1.8.13/go.mod h1:8uMBJj32Pa1wwx8Fuv/AsFhn8jsgw+3rUC2PfoBZ8p4=
|
||||
github.com/pion/sctp v1.8.37 h1:ZDmGPtRPX9mKCiVXtMbTWybFw3z/hVKAZgU81wcOrqs=
|
||||
github.com/pion/sctp v1.8.37/go.mod h1:cNiLdchXra8fHQwmIoqw0MbLLMs+f7uQ+dGMG2gWebE=
|
||||
github.com/pion/sdp/v3 v3.0.11 h1:VhgVSopdsBKwhCFoyyPmT1fKMeV9nLMrEKxNOdy3IVI=
|
||||
github.com/pion/sdp/v3 v3.0.11/go.mod h1:88GMahN5xnScv1hIMTqLdu/cOcUkj6a9ytbncwMCq2E=
|
||||
github.com/pion/srtp/v3 v3.0.4 h1:2Z6vDVxzrX3UHEgrUyIGM4rRouoC7v+NiF1IHtp9B5M=
|
||||
github.com/pion/srtp/v3 v3.0.4/go.mod h1:1Jx3FwDoxpRaTh1oRV8A/6G1BnFL+QI82eK4ms8EEJQ=
|
||||
github.com/pion/stun v0.6.1 h1:8lp6YejULeHBF8NmV8e2787BogQhduZugh5PdhDyyN4=
|
||||
@@ -226,8 +226,8 @@ github.com/pion/transport/v3 v3.0.7 h1:iRbMH05BzSNwhILHoBoAPxoB9xQgOaJk+591KC9P1
|
||||
github.com/pion/transport/v3 v3.0.7/go.mod h1:YleKiTZ4vqNxVwh77Z0zytYi7rXHl7j6uPLGhhz9rwo=
|
||||
github.com/pion/turn/v4 v4.0.0 h1:qxplo3Rxa9Yg1xXDxxH8xaqcyGUtbHYw4QSCvmFWvhM=
|
||||
github.com/pion/turn/v4 v4.0.0/go.mod h1:MuPDkm15nYSklKpN8vWJ9W2M0PlyQZqYt1McGuxG7mA=
|
||||
github.com/pion/webrtc/v4 v4.0.10 h1:Hq/JLjhqLxi+NmCtE8lnRPDr8H4LcNvwg8OxVcdv56Q=
|
||||
github.com/pion/webrtc/v4 v4.0.10/go.mod h1:ViHLVaNpiuvaH8pdiuQxuA9awuE6KVzAXx3vVWilOck=
|
||||
github.com/pion/webrtc/v4 v4.0.14 h1:nyds/sFRR+HvmWoBa6wrL46sSfpArE0qR883MBW96lg=
|
||||
github.com/pion/webrtc/v4 v4.0.14/go.mod h1:R3+qTnQTS03UzwDarYecgioNf7DYgTsldxnCXB821Kk=
|
||||
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
|
Reference in New Issue
Block a user