mirror of
https://github.com/libp2p/go-libp2p.git
synced 2025-10-24 00:23:21 +08:00
quicreuse: add some documentation for the package (#3279)
This adds some helpful documentation for the package and explicitly specifies the reuse and association logic for ListenQUICAndAssociate and DialQUIC.
This commit is contained in:
@@ -1,3 +1,7 @@
|
||||
// Package quicreuse provides `quicreuse.ConnManager`, which provides functionality
|
||||
// for reusing QUIC transports for various purposes, like listening & dialing, having
|
||||
// multiple QUIC listeners on the same address with different ALPNs, and sharing the
|
||||
// same address with non QUIC transports like WebRTC.
|
||||
package quicreuse
|
||||
|
||||
import (
|
||||
@@ -34,6 +38,13 @@ type QUICTransport interface {
|
||||
io.Closer
|
||||
}
|
||||
|
||||
// ConnManager implements using the same listen address for both QUIC & WebTransport, reusing
|
||||
// listen addresses for dialing, and provides a PacketConn for sharing the listen address
|
||||
// with other protocols like WebRTC.
|
||||
// Reusing the listen address for dialing helps with address discovery and hole punching. For details
|
||||
// of the reuse logic see `ListenQUICAndAssociate` and `DialQUIC`.
|
||||
// If reuseport is disabled using the `DisableReuseport` option, listen addresses are not used for
|
||||
// dialing.
|
||||
type ConnManager struct {
|
||||
reuseUDP4 *reuse
|
||||
reuseUDP6 *reuse
|
||||
@@ -69,6 +80,7 @@ func defaultSourceIPSelectorFn() (SourceIPSelector, error) {
|
||||
return &netrouteSourceIPSelector{routes: r}, err
|
||||
}
|
||||
|
||||
// NewConnManager returns a new ConnManager
|
||||
func NewConnManager(statelessResetKey quic.StatelessResetKey, tokenKey quic.TokenGeneratorKey, opts ...Option) (*ConnManager, error) {
|
||||
cm := &ConnManager{
|
||||
enableReuseport: true,
|
||||
@@ -160,11 +172,18 @@ func (c *ConnManager) LendTransport(network string, tr QUICTransport, conn net.P
|
||||
return refCountedTr.borrowDoneSignal, reuse.AddTransport(refCountedTr, localAddr)
|
||||
}
|
||||
|
||||
// ListenQUIC listens for quic connections with the provided `tlsConf.NextProtos` ALPNs on `addr`. The same addr can be shared between
|
||||
// different ALPNs.
|
||||
func (c *ConnManager) ListenQUIC(addr ma.Multiaddr, tlsConf *tls.Config, allowWindowIncrease func(conn quic.Connection, delta uint64) bool) (Listener, error) {
|
||||
return c.ListenQUICAndAssociate(nil, addr, tlsConf, allowWindowIncrease)
|
||||
}
|
||||
|
||||
// ListenQUICAndAssociate returns a QUIC listener and associates the underlying transport with the given association.
|
||||
// ListenQUICAndAssociate listens for quic connections with the provided `tlsConf.NextProtos` ALPNs on `addr`. The same addr can be shared between
|
||||
// different ALPNs.
|
||||
// The QUIC Transport used for listening is tagged with the `association`. Any subsequent `TransportWithAssociationForDial`,
|
||||
// or `DialQUIC` calls with the same `association` will reuse the QUIC Transport used by this method.
|
||||
// A common use of associations is to ensure /quic dials use the quic listening address and /webtransport dials use the
|
||||
// WebTransport listening address.
|
||||
func (c *ConnManager) ListenQUICAndAssociate(association any, addr ma.Multiaddr, tlsConf *tls.Config, allowWindowIncrease func(conn quic.Connection, delta uint64) bool) (Listener, error) {
|
||||
netw, host, err := manet.DialArgs(addr)
|
||||
if err != nil {
|
||||
@@ -230,7 +249,8 @@ func (c *ConnManager) onListenerClosed(key string) {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ConnManager) SharedNonQUICPacketConn(network string, laddr *net.UDPAddr) (net.PacketConn, error) {
|
||||
// SharedNonQUICPacketConn returns a `net.PacketConn` for `laddr` for non QUIC uses.
|
||||
func (c *ConnManager) SharedNonQUICPacketConn(_ string, laddr *net.UDPAddr) (net.PacketConn, error) {
|
||||
c.quicListenersMu.Lock()
|
||||
defer c.quicListenersMu.Unlock()
|
||||
key := laddr.String()
|
||||
@@ -252,7 +272,7 @@ func (c *ConnManager) SharedNonQUICPacketConn(network string, laddr *net.UDPAddr
|
||||
return nil, errors.New("expected to be able to share with a QUIC listener, but the QUIC listener is not using a refcountedTransport. `DisableReuseport` should not be set")
|
||||
}
|
||||
|
||||
func (c *ConnManager) transportForListen(association any, network string, laddr *net.UDPAddr) (refCountedQuicTransport, error) {
|
||||
func (c *ConnManager) transportForListen(association any, network string, laddr *net.UDPAddr) (RefCountedQUICTransport, error) {
|
||||
if c.enableReuseport {
|
||||
reuse, err := c.getReuse(network)
|
||||
if err != nil {
|
||||
@@ -290,6 +310,13 @@ func WithAssociation(ctx context.Context, association any) context.Context {
|
||||
return context.WithValue(ctx, associationKey{}, association)
|
||||
}
|
||||
|
||||
// DialQUIC dials `raddr`. Use `WithAssociation` to select a specific transport that was previously used for listening.
|
||||
// see the documentation for `ListenQUICAndAssociate` for details on associate.
|
||||
// The priority order for reusing the transport is as follows:
|
||||
// - Listening transport with the same association
|
||||
// - Any other listening transport
|
||||
// - Any transport previously used for dialing
|
||||
// If none of these are available, it'll create a new transport.
|
||||
func (c *ConnManager) DialQUIC(ctx context.Context, raddr ma.Multiaddr, tlsConf *tls.Config, allowWindowIncrease func(conn quic.Connection, delta uint64) bool) (quic.Connection, error) {
|
||||
naddr, v, err := FromQuicMultiaddr(raddr)
|
||||
if err != nil {
|
||||
@@ -310,12 +337,9 @@ func (c *ConnManager) DialQUIC(ctx context.Context, raddr ma.Multiaddr, tlsConf
|
||||
return nil, errors.New("unknown QUIC version")
|
||||
}
|
||||
|
||||
var tr refCountedQuicTransport
|
||||
if association := ctx.Value(associationKey{}); association != nil {
|
||||
var tr RefCountedQUICTransport
|
||||
association := ctx.Value(associationKey{})
|
||||
tr, err = c.TransportWithAssociationForDial(association, netw, naddr)
|
||||
} else {
|
||||
tr, err = c.TransportForDial(netw, naddr)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -327,18 +351,23 @@ func (c *ConnManager) DialQUIC(ctx context.Context, raddr ma.Multiaddr, tlsConf
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
func (c *ConnManager) TransportForDial(network string, raddr *net.UDPAddr) (refCountedQuicTransport, error) {
|
||||
// TransportForDial returns a transport for dialing `raddr`.
|
||||
// If reuseport is enabled, it attempts to reuse the QUIC Transport used for
|
||||
// previous listens or dials.
|
||||
func (c *ConnManager) TransportForDial(network string, raddr *net.UDPAddr) (RefCountedQUICTransport, error) {
|
||||
return c.TransportWithAssociationForDial(nil, network, raddr)
|
||||
}
|
||||
|
||||
// TransportWithAssociationForDial returns a QUIC transport for dialing, preferring a transport with the given association.
|
||||
func (c *ConnManager) TransportWithAssociationForDial(association any, network string, raddr *net.UDPAddr) (refCountedQuicTransport, error) {
|
||||
// TransportWithAssociationForDial returns a transport for dialing `raddr`.
|
||||
// If reuseport is enabled, it attempts to reuse the QUIC Transport previously used for listening with `ListenQuicAndAssociate`
|
||||
// with the same `association`. If it fails to do so, it uses any other previously used transport.
|
||||
func (c *ConnManager) TransportWithAssociationForDial(association any, network string, raddr *net.UDPAddr) (RefCountedQUICTransport, error) {
|
||||
if c.enableReuseport {
|
||||
reuse, err := c.getReuse(network)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return reuse.transportWithAssociationForDial(association, network, raddr)
|
||||
return reuse.TransportWithAssociationForDial(association, network, raddr)
|
||||
}
|
||||
|
||||
var laddr *net.UDPAddr
|
||||
@@ -356,6 +385,7 @@ func (c *ConnManager) TransportWithAssociationForDial(association any, network s
|
||||
return &singleOwnerTransport{Transport: &wrappedQUICTransport{&quic.Transport{Conn: conn, StatelessResetKey: &c.srk}}, packetConn: conn}, nil
|
||||
}
|
||||
|
||||
// Protocols returns the supported QUIC protocols. The only supported protocol at the moment is /quic-v1.
|
||||
func (c *ConnManager) Protocols() []int {
|
||||
return []int{ma.P_QUIC_V1}
|
||||
}
|
||||
@@ -374,10 +404,13 @@ func (c *ConnManager) ClientConfig() *quic.Config {
|
||||
return c.clientConfig
|
||||
}
|
||||
|
||||
// wrappedQUICTransport wraps a `quic.Transport` to confirm to `QUICTransport`
|
||||
type wrappedQUICTransport struct {
|
||||
*quic.Transport
|
||||
}
|
||||
|
||||
var _ QUICTransport = (*wrappedQUICTransport)(nil)
|
||||
|
||||
func (t *wrappedQUICTransport) Listen(tlsConf *tls.Config, conf *quic.Config) (QUICListener, error) {
|
||||
return t.Transport.Listen(tlsConf, conf)
|
||||
}
|
||||
|
@@ -97,7 +97,7 @@ func TestConnectionPassedToQUICForListening(t *testing.T) {
|
||||
quicTr, err := cm.transportForListen(nil, netw, naddr)
|
||||
require.NoError(t, err)
|
||||
defer quicTr.Close()
|
||||
if _, ok := quicTr.(*singleOwnerTransport).packetConn.(quic.OOBCapablePacketConn); !ok {
|
||||
if _, ok := quicTr.(*singleOwnerTransport).Transport.(*wrappedQUICTransport).Conn.(quic.OOBCapablePacketConn); !ok {
|
||||
t.Fatal("connection passed to quic-go cannot be type asserted to a *net.UDPConn")
|
||||
}
|
||||
}
|
||||
@@ -141,9 +141,17 @@ func TestConnectionPassedToQUICForDialing(t *testing.T) {
|
||||
if runtime.GOOS == "windows" {
|
||||
t.Skip("skipping on windows. Windows doesn't support these optimizations")
|
||||
}
|
||||
cm, err := NewConnManager(quic.StatelessResetKey{}, quic.TokenGeneratorKey{}, DisableReuseport())
|
||||
for _, reuse := range []bool{true, false} {
|
||||
t.Run(fmt.Sprintf("reuseport: %t", reuse), func(t *testing.T) {
|
||||
var cm *ConnManager
|
||||
var err error
|
||||
if reuse {
|
||||
cm, err = NewConnManager(quic.StatelessResetKey{}, quic.TokenGeneratorKey{})
|
||||
} else {
|
||||
cm, err = NewConnManager(quic.StatelessResetKey{}, quic.TokenGeneratorKey{}, DisableReuseport())
|
||||
}
|
||||
require.NoError(t, err)
|
||||
defer cm.Close()
|
||||
defer func() { _ = cm.Close() }()
|
||||
|
||||
raddr := ma.StringCast("/ip4/127.0.0.1/udp/1234/quic-v1")
|
||||
|
||||
@@ -155,10 +163,18 @@ func TestConnectionPassedToQUICForDialing(t *testing.T) {
|
||||
quicTr, err := cm.TransportForDial(netw, naddr)
|
||||
|
||||
require.NoError(t, err, "dial error")
|
||||
defer quicTr.Close()
|
||||
if _, ok := quicTr.(*singleOwnerTransport).packetConn.(quic.OOBCapablePacketConn); !ok {
|
||||
defer func() { _ = quicTr.Close() }()
|
||||
if reuse {
|
||||
if _, ok := quicTr.(*refcountedTransport).QUICTransport.(*wrappedQUICTransport).Conn.(quic.OOBCapablePacketConn); !ok {
|
||||
t.Fatal("connection passed to quic-go cannot be type asserted to a *net.UDPConn")
|
||||
}
|
||||
} else {
|
||||
if _, ok := quicTr.(*singleOwnerTransport).Transport.(*wrappedQUICTransport).Conn.(quic.OOBCapablePacketConn); !ok {
|
||||
t.Fatal("connection passed to quic-go cannot be type asserted to a *net.UDPConn")
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func getTLSConfForProto(t *testing.T, alpn string) (peer.ID, *tls.Config) {
|
||||
|
@@ -31,7 +31,7 @@ type protoConf struct {
|
||||
|
||||
type quicListener struct {
|
||||
l QUICListener
|
||||
transport refCountedQuicTransport
|
||||
transport RefCountedQUICTransport
|
||||
running chan struct{}
|
||||
addrs []ma.Multiaddr
|
||||
|
||||
@@ -39,7 +39,7 @@ type quicListener struct {
|
||||
protocols map[string]protoConf
|
||||
}
|
||||
|
||||
func newQuicListener(tr refCountedQuicTransport, quicConfig *quic.Config) (*quicListener, error) {
|
||||
func newQuicListener(tr RefCountedQUICTransport, quicConfig *quic.Config) (*quicListener, error) {
|
||||
localMultiaddrs := make([]ma.Multiaddr, 0, 2)
|
||||
a, err := ToQuicMultiaddr(tr.LocalAddr(), quic.Version1)
|
||||
if err != nil {
|
||||
|
@@ -10,7 +10,7 @@ import (
|
||||
// non-QUIC packets on a quic.Transport. This lets us reuse this UDP port for
|
||||
// other transports like WebRTC.
|
||||
type nonQUICPacketConn struct {
|
||||
owningTransport refCountedQuicTransport
|
||||
owningTransport RefCountedQUICTransport
|
||||
tr QUICTransport
|
||||
ctx context.Context
|
||||
ctxCancel context.CancelFunc
|
||||
|
@@ -13,7 +13,7 @@ import (
|
||||
"github.com/quic-go/quic-go"
|
||||
)
|
||||
|
||||
type refCountedQuicTransport interface {
|
||||
type RefCountedQUICTransport interface {
|
||||
LocalAddr() net.Addr
|
||||
|
||||
// Used to send packets directly around QUIC. Useful for hole punching.
|
||||
@@ -37,6 +37,7 @@ type singleOwnerTransport struct {
|
||||
}
|
||||
|
||||
var _ QUICTransport = &singleOwnerTransport{}
|
||||
var _ RefCountedQUICTransport = (*singleOwnerTransport)(nil)
|
||||
|
||||
func (c *singleOwnerTransport) IncreaseCount() {}
|
||||
func (c *singleOwnerTransport) DecreaseCount() { c.Transport.Close() }
|
||||
@@ -264,7 +265,7 @@ func (r *reuse) gc() {
|
||||
}
|
||||
}
|
||||
|
||||
func (r *reuse) transportWithAssociationForDial(association any, network string, raddr *net.UDPAddr) (*refcountedTransport, error) {
|
||||
func (r *reuse) TransportWithAssociationForDial(association any, network string, raddr *net.UDPAddr) (*refcountedTransport, error) {
|
||||
var ip *net.IP
|
||||
|
||||
// Only bother looking up the source address if we actually _have_ non 0.0.0.0 listeners.
|
||||
@@ -368,7 +369,7 @@ func (r *reuse) AddTransport(tr *refcountedTransport, laddr *net.UDPAddr) error
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *reuse) AssertTransportExists(tr refCountedQuicTransport) error {
|
||||
func (r *reuse) AssertTransportExists(tr RefCountedQUICTransport) error {
|
||||
t, ok := tr.(*refcountedTransport)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid transport type: expected: *refcountedTransport, got: %T", tr)
|
||||
|
@@ -91,7 +91,7 @@ func TestReuseCreateNewGlobalConnOnDial(t *testing.T) {
|
||||
|
||||
addr, err := net.ResolveUDPAddr("udp4", "1.1.1.1:1234")
|
||||
require.NoError(t, err)
|
||||
conn, err := reuse.transportWithAssociationForDial(nil, "udp4", addr)
|
||||
conn, err := reuse.TransportWithAssociationForDial(nil, "udp4", addr)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, conn.GetCount())
|
||||
laddr := conn.LocalAddr().(*net.UDPAddr)
|
||||
@@ -105,15 +105,15 @@ func TestReuseConnectionWhenDialing(t *testing.T) {
|
||||
|
||||
addr, err := net.ResolveUDPAddr("udp4", "0.0.0.0:0")
|
||||
require.NoError(t, err)
|
||||
lconn, err := reuse.TransportForListen("udp4", addr)
|
||||
ltr, err := reuse.TransportForListen("udp4", addr)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, lconn.GetCount())
|
||||
require.Equal(t, 1, ltr.GetCount())
|
||||
// dial
|
||||
raddr, err := net.ResolveUDPAddr("udp4", "1.1.1.1:1234")
|
||||
require.NoError(t, err)
|
||||
conn, err := reuse.transportWithAssociationForDial(nil, "udp4", raddr)
|
||||
tr, err := reuse.TransportWithAssociationForDial(nil, "udp4", raddr)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 2, conn.GetCount())
|
||||
require.Equal(t, 2, tr.GetCount())
|
||||
}
|
||||
|
||||
func TestReuseConnectionWhenListening(t *testing.T) {
|
||||
@@ -122,7 +122,7 @@ func TestReuseConnectionWhenListening(t *testing.T) {
|
||||
|
||||
raddr, err := net.ResolveUDPAddr("udp4", "1.1.1.1:1234")
|
||||
require.NoError(t, err)
|
||||
tr, err := reuse.transportWithAssociationForDial(nil, "udp4", raddr)
|
||||
tr, err := reuse.TransportWithAssociationForDial(nil, "udp4", raddr)
|
||||
require.NoError(t, err)
|
||||
laddr := &net.UDPAddr{IP: net.IPv4zero, Port: tr.LocalAddr().(*net.UDPAddr).Port}
|
||||
lconn, err := reuse.TransportForListen("udp4", laddr)
|
||||
@@ -138,7 +138,7 @@ func TestReuseConnectionWhenDialBeforeListen(t *testing.T) {
|
||||
// dial any address
|
||||
raddr, err := net.ResolveUDPAddr("udp4", "1.1.1.1:1234")
|
||||
require.NoError(t, err)
|
||||
rTr, err := reuse.transportWithAssociationForDial(nil, "udp4", raddr)
|
||||
rTr, err := reuse.TransportWithAssociationForDial(nil, "udp4", raddr)
|
||||
require.NoError(t, err)
|
||||
|
||||
// open a listener
|
||||
@@ -149,7 +149,7 @@ func TestReuseConnectionWhenDialBeforeListen(t *testing.T) {
|
||||
// new dials should go via the listener connection
|
||||
raddr, err = net.ResolveUDPAddr("udp4", "1.1.1.1:1235")
|
||||
require.NoError(t, err)
|
||||
tr, err := reuse.transportWithAssociationForDial(nil, "udp4", raddr)
|
||||
tr, err := reuse.TransportWithAssociationForDial(nil, "udp4", raddr)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, lTr, tr)
|
||||
require.Equal(t, 2, tr.GetCount())
|
||||
@@ -183,7 +183,7 @@ func TestReuseListenOnSpecificInterface(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, lconn.GetCount())
|
||||
// dial
|
||||
conn, err := reuse.transportWithAssociationForDial(nil, "udp4", raddr)
|
||||
conn, err := reuse.TransportWithAssociationForDial(nil, "udp4", raddr)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, conn.GetCount())
|
||||
}
|
||||
@@ -214,7 +214,7 @@ func TestReuseGarbageCollect(t *testing.T) {
|
||||
|
||||
raddr, err := net.ResolveUDPAddr("udp4", "1.2.3.4:1234")
|
||||
require.NoError(t, err)
|
||||
dTr, err := reuse.transportWithAssociationForDial(nil, "udp4", raddr)
|
||||
dTr, err := reuse.TransportWithAssociationForDial(nil, "udp4", raddr)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, dTr.GetCount())
|
||||
|
||||
|
Reference in New Issue
Block a user