feat(network): Add Conn.As

ConnAs works in a similar way to errors.As. It allows a user to cut
through the interface layers and extract a specific type of connection
if available.

This serves as a sort of escape hatch to allow users to leverage some
connection specific feature without having to support that feature for
all connections. Getting RTT information is one example.

It also allows us, within the library, to get specific types of
connections out of the interface box. This would have been useful in the
recent changes in tcpreuse. See
https://github.com/libp2p/go-libp2p/pull/3181 and
https://github.com/libp2p/go-libp2p/pull/3142.

Getting access to the underlying type can lead to hard to debug issues.
For example, if a user mutates connection state on the underlying type,
hooks that relied on only mutating that state from the wrapped
connection would never be called.

It is up to the user to ensure they are using this safely.
This commit is contained in:
Marco Munizaga
2025-07-17 11:35:06 -07:00
committed by Marco Munizaga
parent 3ecae665f1
commit b3f1e66e26
12 changed files with 159 additions and 0 deletions

View File

@@ -87,6 +87,20 @@ type Conn interface {
// IsClosed returns whether a connection is fully closed, so it can
// be garbage collected.
IsClosed() bool
// As finds the first conn in Conn's wrapped types that matches target, and
// if one is found, sets target to that conn value and returns true.
// Otherwise, it returns false. Similar to errors.As.
//
// target must be a pointer to the type you are matching against.
//
// This is an EXPERIMENTAL API. Getting access to the underlying type can
// lead to hard to debug issues. For example, if you mutate connection state
// on the underlying type, hooks that relied on only mutating that state
// from the wrapped connection would never be called.
//
// You very likely do not need to use this method.
As(target any) bool
}
// ConnectionState holds information about the connection.

View File

@@ -137,6 +137,20 @@ type MuxedConn interface {
// AcceptStream accepts a stream opened by the other side.
AcceptStream() (MuxedStream, error)
// As finds the first conn in MuxedConn's wrapped types that matches target,
// and if one is found, sets target to that conn value and returns true.
// Otherwise, it returns false. Similar to errors.As.
//
// target must be a pointer to the type you are matching against.
//
// This is an EXPERIMENTAL API. Getting access to the underlying type can
// lead to hard to debug issues. For example, if you mutate connection state
// on the underlying type, hooks that relied on only mutating that state
// from the wrapped connection would never be called.
//
// You very likely do not need to use this method.
As(target any) bool
}
// Multiplexer wraps a net.Conn with a stream multiplexing

View File

@@ -40,6 +40,10 @@ import (
libp2pwebrtc "github.com/libp2p/go-libp2p/p2p/transport/webrtc"
"github.com/libp2p/go-libp2p/p2p/transport/websocket"
webtransport "github.com/libp2p/go-libp2p/p2p/transport/webtransport"
"github.com/libp2p/go-yamux/v5"
"github.com/pion/webrtc/v4"
quicgo "github.com/quic-go/quic-go"
wtgo "github.com/quic-go/webtransport-go"
"go.uber.org/goleak"
ma "github.com/multiformats/go-multiaddr"
@@ -842,3 +846,76 @@ func BenchmarkAllAddrs(b *testing.B) {
addrsHost.AllAddrs()
}
}
func TestConnAs(t *testing.T) {
type testCase struct {
name string
listenAddr string
testAs func(t *testing.T, c network.Conn)
}
testCases := []testCase{
{
"QUIC",
"/ip4/0.0.0.0/udp/0/quic-v1",
func(t *testing.T, c network.Conn) {
var quicConn *quicgo.Conn
require.True(t, c.As(&quicConn))
},
},
{
"TCP+Yamux",
"/ip4/0.0.0.0/tcp/0",
func(t *testing.T, c network.Conn) {
var yamuxSession *yamux.Session
require.True(t, c.As(&yamuxSession))
},
},
{
"WebRTC",
"/ip4/0.0.0.0/udp/0/webrtc-direct",
func(t *testing.T, c network.Conn) {
var webrtcPC *webrtc.PeerConnection
require.True(t, c.As(&webrtcPC))
},
},
{
"WebTransport Session",
"/ip4/0.0.0.0/udp/0/quic-v1/webtransport",
func(t *testing.T, c network.Conn) {
var s *wtgo.Session
require.True(t, c.As(&s))
},
},
{
"WebTransport QUIC Conn",
"/ip4/0.0.0.0/udp/0/quic-v1/webtransport",
func(t *testing.T, c network.Conn) {
var quicConn *quicgo.Conn
require.True(t, c.As(&quicConn))
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
h1, err := New(ListenAddrStrings(
tc.listenAddr,
))
require.NoError(t, err)
defer h1.Close()
h2, err := New(ListenAddrStrings(
tc.listenAddr,
))
require.NoError(t, err)
defer h2.Close()
err = h1.Connect(context.Background(), peer.AddrInfo{
ID: h2.ID(),
Addrs: h2.Addrs(),
})
require.NoError(t, err)
c := h1.Network().ConnsToPeer(h2.ID())[0]
tc.testAs(t, c)
})
}
}

View File

@@ -13,6 +13,14 @@ type conn yamux.Session
var _ network.MuxedConn = &conn{}
func (c *conn) As(target any) bool {
if t, ok := target.(**yamux.Session); ok {
*t = (*yamux.Session)(c)
return true
}
return false
}
// NewMuxedConn constructs a new MuxedConn from a yamux.Session.
func NewMuxedConn(m *yamux.Session) network.MuxedConn {
return (*conn)(m)

View File

@@ -818,6 +818,7 @@ func (m mockConn) NewStream(_ context.Context) (network.Stream, error) { panic("
func (m mockConn) GetStreams() []network.Stream { panic("implement me") }
func (m mockConn) Scope() network.ConnScope { panic("implement me") }
func (m mockConn) ConnState() network.ConnectionState { return network.ConnectionState{} }
func (m mockConn) As(_ any) bool { return false }
func makeSegmentsWithPeerInfos(peerInfos peerInfos) *segments {
var s = func() *segments {

View File

@@ -86,6 +86,10 @@ func (c *conn) Close() error {
return nil
}
func (c *conn) As(_ any) bool {
return false
}
func (c *conn) teardown() {
for _, s := range c.allStreams() {
s.Reset()

View File

@@ -833,6 +833,10 @@ func wrapWithMetrics(capableConn transport.CapableConn, metricsTracer MetricsTra
return c
}
func (c *connWithMetrics) As(target any) bool {
return c.CapableConn.As(target)
}
func (c *connWithMetrics) completedHandshake() {
c.metricsTracer.CompletedHandshake(time.Since(c.opened), c.ConnState(), c.LocalMultiaddr())
}

View File

@@ -42,6 +42,10 @@ type Conn struct {
var _ network.Conn = &Conn{}
func (c *Conn) As(target any) bool {
return c.conn.As(target)
}
func (c *Conn) IsClosed() bool {
return c.conn.IsClosed()
}

View File

@@ -23,6 +23,10 @@ type transportConn struct {
var _ transport.CapableConn = &transportConn{}
func (c *transportConn) As(target any) bool {
return c.MuxedConn.As(target)
}
func (t *transportConn) Transport() transport.Transport {
return t.transport
}

View File

@@ -25,6 +25,15 @@ type conn struct {
remoteMultiaddr ma.Multiaddr
}
func (c *conn) As(target any) bool {
if t, ok := target.(**quic.Conn); ok {
*t = c.quicConn
return true
}
return false
}
var _ tpt.CapableConn = &conn{}
// Close closes the connection.

View File

@@ -132,6 +132,14 @@ func (c *connection) Close() error {
return nil
}
func (c *connection) As(target any) bool {
if target, ok := target.(**webrtc.PeerConnection); ok {
*target = c.pc
return true
}
return false
}
// CloseWithError closes the connection ignoring the error code. As there's no way to signal
// the remote peer on closing the underlying peerconnection, we ignore the error code.
func (c *connection) CloseWithError(_ network.ConnErrorCode) error {

View File

@@ -89,3 +89,15 @@ func (c *conn) Transport() tpt.Transport { return c.transport }
func (c *conn) ConnState() network.ConnectionState {
return network.ConnectionState{Transport: "webtransport"}
}
func (c *conn) As(target any) bool {
if target, ok := target.(**quic.Conn); ok {
*target = c.qconn
return true
}
if target, ok := target.(**webtransport.Session); ok {
*target = c.session
return true
}
return false
}