mirror of
https://github.com/libp2p/go-libp2p.git
synced 2025-09-26 20:21:26 +08:00
connmgr: fix transport association bug (#3221)
reuse port didn't work for the second transport, either QUIC or WebTransport, that was used for listening. This change fixes it by calling associate on all paths. This impacted hole punching for some users since you cannot hole punch without reuse port. There's a test in holepunch package to prevent regressions. Fixes #3165 Co-authored-by: Marco Munizaga <git@marcopolo.io>
This commit is contained in:
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"slices"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
@@ -682,3 +683,98 @@ func SetLegacyBehavior(legacyBehavior bool) holepunch.Option {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// TestEndToEndSimConnectQUICReuse tests that hole punching works if we are
|
||||
// reusing the same port for QUIC and WebTransport, and when we have multiple
|
||||
// QUIC listeners on different ports.
|
||||
//
|
||||
// If this tests fails or is flaky it may be because:
|
||||
// - The quicreuse logic (and association logic) is not returning the appropriate transport for holepunching.
|
||||
// - The ordering of listeners is unexpected (remember the swarm will sort the listeners with `.ListenOrder()`).
|
||||
func TestEndToEndSimConnectQUICReuse(t *testing.T) {
|
||||
h1tr := &mockEventTracer{}
|
||||
h2tr := &mockEventTracer{}
|
||||
|
||||
router := &simconn.SimpleFirewallRouter{}
|
||||
relay := MustNewHost(t,
|
||||
quicSimConn(true, router),
|
||||
libp2p.ListenAddrs(ma.StringCast("/ip4/1.2.0.1/udp/8000/quic-v1")),
|
||||
libp2p.DisableRelay(),
|
||||
libp2p.ResourceManager(&network.NullResourceManager{}),
|
||||
libp2p.WithFxOption(fx.Invoke(func(h host.Host) {
|
||||
// Setup relay service
|
||||
_, err := relayv2.New(h)
|
||||
require.NoError(t, err)
|
||||
})),
|
||||
)
|
||||
|
||||
// We return addrs of quic on port 8001 and circuit.
|
||||
// This lets us listen on other ports for QUIC in order to confuse the quicreuse logic during hole punching.
|
||||
onlyQuicOnPort8001AndCircuit := func(addrs []ma.Multiaddr) []ma.Multiaddr {
|
||||
return slices.DeleteFunc(addrs, func(a ma.Multiaddr) bool {
|
||||
_, err := a.ValueForProtocol(ma.P_CIRCUIT)
|
||||
isCircuit := err == nil
|
||||
if isCircuit {
|
||||
return false
|
||||
}
|
||||
_, err = a.ValueForProtocol(ma.P_QUIC_V1)
|
||||
isQuic := err == nil
|
||||
if !isQuic {
|
||||
return true
|
||||
}
|
||||
port, err := a.ValueForProtocol(ma.P_UDP)
|
||||
if err != nil {
|
||||
return true
|
||||
}
|
||||
isPort8001 := port == "8001"
|
||||
return !isPort8001
|
||||
})
|
||||
}
|
||||
|
||||
h1 := MustNewHost(t,
|
||||
quicSimConn(false, router),
|
||||
libp2p.EnableHolePunching(holepunch.WithTracer(h1tr), holepunch.DirectDialTimeout(100*time.Millisecond)),
|
||||
libp2p.ListenAddrs(ma.StringCast("/ip4/2.2.0.1/udp/8001/quic-v1/webtransport")),
|
||||
libp2p.ResourceManager(&network.NullResourceManager{}),
|
||||
libp2p.AddrsFactory(onlyQuicOnPort8001AndCircuit),
|
||||
libp2p.ForceReachabilityPrivate(),
|
||||
)
|
||||
// Listen on quic *after* listening on webtransport.
|
||||
// This is to test that the quicreuse logic is not returning the wrong transport.
|
||||
// See: https://github.com/libp2p/go-libp2p/issues/3165#issuecomment-2700126706 for details.
|
||||
h1.Network().Listen(
|
||||
ma.StringCast("/ip4/2.2.0.1/udp/8001/quic-v1"),
|
||||
ma.StringCast("/ip4/2.2.0.1/udp/9001/quic-v1"),
|
||||
)
|
||||
|
||||
h2 := MustNewHost(t,
|
||||
quicSimConn(false, router),
|
||||
libp2p.ListenAddrs(
|
||||
ma.StringCast("/ip4/2.2.0.2/udp/8001/quic-v1/webtransport"),
|
||||
),
|
||||
libp2p.ResourceManager(&network.NullResourceManager{}),
|
||||
connectToRelay(&relay),
|
||||
libp2p.EnableHolePunching(holepunch.WithTracer(h2tr), holepunch.DirectDialTimeout(100*time.Millisecond)),
|
||||
libp2p.AddrsFactory(onlyQuicOnPort8001AndCircuit),
|
||||
libp2p.ForceReachabilityPrivate(),
|
||||
)
|
||||
// Listen on quic after listening on webtransport.
|
||||
h2.Network().Listen(
|
||||
ma.StringCast("/ip4/2.2.0.2/udp/8001/quic-v1"),
|
||||
ma.StringCast("/ip4/2.2.0.2/udp/9001/quic-v1"),
|
||||
)
|
||||
|
||||
defer h1.Close()
|
||||
defer h2.Close()
|
||||
defer relay.Close()
|
||||
|
||||
// Wait for holepunch service to start
|
||||
waitForHolePunchingSvcActive(t, h1)
|
||||
waitForHolePunchingSvcActive(t, h2)
|
||||
|
||||
learnAddrs(h1, h2)
|
||||
pingAtoB(t, h1, h2)
|
||||
|
||||
// wait till a direct connection is complete
|
||||
ensureDirectConn(t, h1, h2)
|
||||
}
|
||||
|
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"sync"
|
||||
@@ -190,6 +191,18 @@ func (c *ConnManager) ListenQUICAndAssociate(association any, addr ma.Multiaddr,
|
||||
}
|
||||
key = tr.LocalAddr().String()
|
||||
entry = quicListenerEntry{ln: ln}
|
||||
} else if c.enableReuseport && association != nil {
|
||||
reuse, err := c.getReuse(netw)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("reuse error: %w", err)
|
||||
}
|
||||
err = reuse.AssertTransportExists(entry.ln.transport)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("reuse assert transport failed: %w", err)
|
||||
}
|
||||
if tr, ok := entry.ln.transport.(*refcountedTransport); ok {
|
||||
tr.associate(association)
|
||||
}
|
||||
}
|
||||
l, err := entry.ln.Add(tlsConf, allowWindowIncrease, func() { c.onListenerClosed(key) })
|
||||
if err != nil {
|
||||
|
@@ -315,3 +315,59 @@ func TestExternalTransport(t *testing.T) {
|
||||
t.Fatal("doneWithTr not closed")
|
||||
}
|
||||
}
|
||||
|
||||
func TestAssociate(t *testing.T) {
|
||||
testAssociate := func(lnAddr1, lnAddr2 ma.Multiaddr, dialAddr *net.UDPAddr) {
|
||||
cm, err := NewConnManager(quic.StatelessResetKey{}, quic.TokenGeneratorKey{})
|
||||
require.NoError(t, err)
|
||||
defer cm.Close()
|
||||
|
||||
lp2pTLS := &tls.Config{NextProtos: []string{"libp2p"}}
|
||||
assoc1 := "test-1"
|
||||
ln1, err := cm.ListenQUICAndAssociate(assoc1, lnAddr1, lp2pTLS, nil)
|
||||
require.NoError(t, err)
|
||||
defer ln1.Close()
|
||||
addrs := ln1.Multiaddrs()
|
||||
require.Len(t, addrs, 1)
|
||||
|
||||
addr := addrs[0]
|
||||
assoc2 := "test-2"
|
||||
h3TLS := &tls.Config{NextProtos: []string{"h3"}}
|
||||
ln2, err := cm.ListenQUICAndAssociate(assoc2, addr, h3TLS, nil)
|
||||
require.NoError(t, err)
|
||||
defer ln2.Close()
|
||||
|
||||
tr1, err := cm.TransportWithAssociationForDial(assoc1, "udp4", dialAddr)
|
||||
require.NoError(t, err)
|
||||
defer tr1.Close()
|
||||
require.Equal(t, tr1.LocalAddr().String(), ln1.Addr().String())
|
||||
|
||||
tr2, err := cm.TransportWithAssociationForDial(assoc2, "udp4", dialAddr)
|
||||
require.NoError(t, err)
|
||||
defer tr2.Close()
|
||||
require.Equal(t, tr2.LocalAddr().String(), ln2.Addr().String())
|
||||
|
||||
ln3, err := cm.ListenQUICAndAssociate(assoc1, lnAddr2, lp2pTLS, nil)
|
||||
require.NoError(t, err)
|
||||
defer ln3.Close()
|
||||
|
||||
// an unused association should also return the same transport
|
||||
// association is only a preference for a specific transport, not an exclusion criteria
|
||||
tr3, err := cm.TransportWithAssociationForDial("unused", "udp4", dialAddr)
|
||||
require.NoError(t, err)
|
||||
defer tr3.Close()
|
||||
require.Contains(t, []string{ln2.Addr().String(), ln3.Addr().String()}, tr3.LocalAddr().String())
|
||||
}
|
||||
|
||||
t.Run("MultipleUnspecifiedListeners", func(t *testing.T) {
|
||||
testAssociate(ma.StringCast("/ip4/0.0.0.0/udp/0/quic-v1"),
|
||||
ma.StringCast("/ip4/0.0.0.0/udp/0/quic-v1"),
|
||||
&net.UDPAddr{IP: net.IPv4(1, 1, 1, 1), Port: 1})
|
||||
})
|
||||
t.Run("MultipleSpecificListeners", func(t *testing.T) {
|
||||
testAssociate(ma.StringCast("/ip4/127.0.0.1/udp/0/quic-v1"),
|
||||
ma.StringCast("/ip4/127.0.0.1/udp/0/quic-v1"),
|
||||
&net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 1},
|
||||
)
|
||||
})
|
||||
}
|
||||
|
@@ -303,6 +303,10 @@ func (r *reuse) transportForDialLocked(association any, network string, source *
|
||||
return tr, nil
|
||||
}
|
||||
}
|
||||
// We don't have a transport with the association, use any one
|
||||
for _, tr := range trs {
|
||||
return tr, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -313,6 +317,10 @@ func (r *reuse) transportForDialLocked(association any, network string, source *
|
||||
return tr, nil
|
||||
}
|
||||
}
|
||||
// We don't have a transport with the association, use any one
|
||||
for _, tr := range r.globalListeners {
|
||||
return tr, nil
|
||||
}
|
||||
|
||||
// Use a transport we've previously dialed from
|
||||
for _, tr := range r.globalDialers {
|
||||
@@ -360,6 +368,33 @@ func (r *reuse) AddTransport(tr *refcountedTransport, laddr *net.UDPAddr) error
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *reuse) AssertTransportExists(tr refCountedQuicTransport) error {
|
||||
t, ok := tr.(*refcountedTransport)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid transport type: expected: *refcountedTransport, got: %T", tr)
|
||||
}
|
||||
laddr := t.LocalAddr().(*net.UDPAddr)
|
||||
if laddr.IP.IsUnspecified() {
|
||||
if lt, ok := r.globalListeners[laddr.Port]; ok {
|
||||
if lt == t {
|
||||
return nil
|
||||
}
|
||||
return errors.New("two global listeners on the same port")
|
||||
}
|
||||
return errors.New("transport not found")
|
||||
}
|
||||
if m, ok := r.unicast[laddr.IP.String()]; ok {
|
||||
if lt, ok := m[laddr.Port]; ok {
|
||||
if lt == t {
|
||||
return nil
|
||||
}
|
||||
return errors.New("two unicast listeners on same ip:port")
|
||||
}
|
||||
return errors.New("transport not found")
|
||||
}
|
||||
return errors.New("transport not found")
|
||||
}
|
||||
|
||||
func (r *reuse) TransportForListen(network string, laddr *net.UDPAddr) (*refcountedTransport, error) {
|
||||
r.mutex.Lock()
|
||||
defer r.mutex.Unlock()
|
||||
|
Reference in New Issue
Block a user