mirror of
https://github.com/libp2p/go-libp2p.git
synced 2025-09-26 20:21:26 +08:00
upgrader: absorb SSMuxer into the upgrader
This commit is contained in:
@@ -14,6 +14,8 @@ import (
|
||||
"github.com/libp2p/go-libp2p/core/peerstore"
|
||||
"github.com/libp2p/go-libp2p/core/pnet"
|
||||
"github.com/libp2p/go-libp2p/core/routing"
|
||||
"github.com/libp2p/go-libp2p/core/sec"
|
||||
"github.com/libp2p/go-libp2p/core/sec/insecure"
|
||||
"github.com/libp2p/go-libp2p/core/transport"
|
||||
"github.com/libp2p/go-libp2p/p2p/host/autonat"
|
||||
"github.com/libp2p/go-libp2p/p2p/host/autorelay"
|
||||
@@ -167,20 +169,9 @@ func (cfg *Config) addTransports(h host.Host) error {
|
||||
return fmt.Errorf("swarm does not support transports")
|
||||
}
|
||||
|
||||
var security []fx.Option
|
||||
if cfg.Insecure {
|
||||
security = append(security, fx.Provide(makeInsecureTransport))
|
||||
} else {
|
||||
security = cfg.SecurityTransports
|
||||
}
|
||||
|
||||
fxopts := []fx.Option{
|
||||
fx.WithLogger(func() fxevent.Logger { return getFXLogger() }),
|
||||
fx.Provide(tptu.New),
|
||||
fx.Provide(fx.Annotate(
|
||||
makeSecurityMuxer,
|
||||
fx.ParamTags(`group:"security"`),
|
||||
)),
|
||||
fx.Provide(fx.Annotate(tptu.New, fx.ParamTags(`group:"security"`))),
|
||||
fx.Supply(cfg.Muxers),
|
||||
fx.Supply(h.ID()),
|
||||
fx.Provide(func() host.Host { return h }),
|
||||
@@ -191,8 +182,19 @@ func (cfg *Config) addTransports(h host.Host) error {
|
||||
fx.Provide(func() *madns.Resolver { return cfg.MultiaddrResolver }),
|
||||
}
|
||||
fxopts = append(fxopts, cfg.Transports...)
|
||||
if !cfg.Insecure {
|
||||
fxopts = append(fxopts, security...)
|
||||
if cfg.Insecure {
|
||||
fxopts = append(fxopts,
|
||||
fx.Provide(
|
||||
fx.Annotate(
|
||||
func(id peer.ID, priv crypto.PrivKey) sec.SecureTransport {
|
||||
return insecure.NewWithIdentity(insecure.ID, id, priv)
|
||||
},
|
||||
fx.ResultTags(`group:"security"`),
|
||||
),
|
||||
),
|
||||
)
|
||||
} else {
|
||||
fxopts = append(fxopts, cfg.SecurityTransports...)
|
||||
}
|
||||
|
||||
fxopts = append(fxopts, fx.Invoke(
|
||||
|
@@ -1,23 +0,0 @@
|
||||
package config
|
||||
|
||||
import (
|
||||
"github.com/libp2p/go-libp2p/core/crypto"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/libp2p/go-libp2p/core/sec"
|
||||
"github.com/libp2p/go-libp2p/core/sec/insecure"
|
||||
csms "github.com/libp2p/go-libp2p/p2p/net/conn-security-multistream"
|
||||
)
|
||||
|
||||
func makeInsecureTransport(id peer.ID, privKey crypto.PrivKey) sec.SecureMuxer {
|
||||
secMuxer := new(csms.SSMuxer)
|
||||
secMuxer.AddTransport(insecure.ID, insecure.NewWithIdentity(insecure.ID, id, privKey))
|
||||
return secMuxer
|
||||
}
|
||||
|
||||
func makeSecurityMuxer(tpts []sec.SecureTransport) sec.SecureMuxer {
|
||||
secMuxer := new(csms.SSMuxer)
|
||||
for _, tpt := range tpts {
|
||||
secMuxer.AddTransport(string(tpt.ID()), tpt)
|
||||
}
|
||||
return secMuxer
|
||||
}
|
@@ -29,18 +29,3 @@ type SecureTransport interface {
|
||||
// ID is the protocol ID of the security protocol.
|
||||
ID() protocol.ID
|
||||
}
|
||||
|
||||
// A SecureMuxer is a wrapper around SecureTransport which can select security protocols
|
||||
// and open outbound connections with simultaneous open.
|
||||
type SecureMuxer interface {
|
||||
// SecureInbound secures an inbound connection.
|
||||
// The returned boolean indicates whether the connection should be treated as a server
|
||||
// connection; in the case of SecureInbound it should always be true.
|
||||
// If p is empty, connections from any peer are accepted.
|
||||
SecureInbound(ctx context.Context, insecure net.Conn, p peer.ID) (SecureConn, bool, error)
|
||||
|
||||
// SecureOutbound secures an outbound connection.
|
||||
// The returned boolean indicates whether the connection should be treated as a server
|
||||
// connection due to simultaneous open.
|
||||
SecureOutbound(ctx context.Context, insecure net.Conn, p peer.ID) (SecureConn, bool, error)
|
||||
}
|
||||
|
@@ -1,101 +0,0 @@
|
||||
package csms
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/libp2p/go-libp2p/core/sec"
|
||||
|
||||
mss "github.com/multiformats/go-multistream"
|
||||
)
|
||||
|
||||
// SSMuxer is a multistream stream security transport multiplexer.
|
||||
//
|
||||
// SSMuxer is safe to use without initialization. However, it's not safe to move
|
||||
// after use.
|
||||
type SSMuxer struct {
|
||||
mux mss.MultistreamMuxer
|
||||
tpts map[string]sec.SecureTransport
|
||||
OrderPreference []string
|
||||
}
|
||||
|
||||
var _ sec.SecureMuxer = (*SSMuxer)(nil)
|
||||
|
||||
// AddTransport adds a stream security transport to this multistream muxer.
|
||||
//
|
||||
// This method is *not* thread-safe. It should be called only when initializing
|
||||
// the SSMuxer.
|
||||
func (sm *SSMuxer) AddTransport(path string, transport sec.SecureTransport) {
|
||||
if sm.tpts == nil {
|
||||
sm.tpts = make(map[string]sec.SecureTransport, 1)
|
||||
}
|
||||
|
||||
sm.mux.AddHandler(path, nil)
|
||||
sm.tpts[path] = transport
|
||||
sm.OrderPreference = append(sm.OrderPreference, path)
|
||||
}
|
||||
|
||||
// SecureInbound secures an inbound connection using this multistream
|
||||
// multiplexed stream security transport.
|
||||
func (sm *SSMuxer) SecureInbound(ctx context.Context, insecure net.Conn, p peer.ID) (sec.SecureConn, bool, error) {
|
||||
tpt, _, err := sm.selectProto(ctx, insecure, true)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
sconn, err := tpt.SecureInbound(ctx, insecure, p)
|
||||
return sconn, true, err
|
||||
}
|
||||
|
||||
// SecureOutbound secures an outbound connection using this multistream
|
||||
// multiplexed stream security transport.
|
||||
func (sm *SSMuxer) SecureOutbound(ctx context.Context, insecure net.Conn, p peer.ID) (sec.SecureConn, bool, error) {
|
||||
tpt, server, err := sm.selectProto(ctx, insecure, false)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
if server {
|
||||
sconn, err := tpt.SecureInbound(ctx, insecure, p)
|
||||
if err != nil {
|
||||
return nil, false, fmt.Errorf("failed to secure inbound connection: %s", err)
|
||||
}
|
||||
return sconn, true, nil
|
||||
}
|
||||
sconn, err := tpt.SecureOutbound(ctx, insecure, p)
|
||||
return sconn, false, err
|
||||
}
|
||||
|
||||
func (sm *SSMuxer) selectProto(ctx context.Context, insecure net.Conn, server bool) (sec.SecureTransport, bool, error) {
|
||||
var proto string
|
||||
var err error
|
||||
var iamserver bool
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer close(done)
|
||||
if server {
|
||||
iamserver = true
|
||||
proto, _, err = sm.mux.Negotiate(insecure)
|
||||
} else {
|
||||
proto, iamserver, err = mss.SelectWithSimopenOrFail(sm.OrderPreference, insecure)
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
if tpt, ok := sm.tpts[proto]; ok {
|
||||
return tpt, iamserver, nil
|
||||
}
|
||||
return nil, false, fmt.Errorf("selected unknown security transport")
|
||||
case <-ctx.Done():
|
||||
// We *must* do this. We have outstanding work on the connection
|
||||
// and it's no longer safe to use.
|
||||
insecure.Close()
|
||||
<-done // wait to stop using the connection.
|
||||
return nil, false, ctx.Err()
|
||||
}
|
||||
}
|
@@ -1,121 +0,0 @@
|
||||
package csms
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"io"
|
||||
"net"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/crypto"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/libp2p/go-libp2p/core/sec"
|
||||
"github.com/libp2p/go-libp2p/core/sec/insecure"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func newPeer(t *testing.T) (crypto.PrivKey, peer.ID) {
|
||||
priv, _, err := crypto.GenerateEd25519Key(rand.Reader)
|
||||
require.NoError(t, err)
|
||||
id, err := peer.IDFromPrivateKey(priv)
|
||||
require.NoError(t, err)
|
||||
return priv, id
|
||||
}
|
||||
|
||||
type TransportAdapter struct {
|
||||
mux *SSMuxer
|
||||
}
|
||||
|
||||
func (sm *TransportAdapter) SecureInbound(ctx context.Context, insecure net.Conn, p peer.ID) (sec.SecureConn, error) {
|
||||
sconn, _, err := sm.mux.SecureInbound(ctx, insecure, p)
|
||||
return sconn, err
|
||||
}
|
||||
|
||||
func (sm *TransportAdapter) SecureOutbound(ctx context.Context, insecure net.Conn, p peer.ID) (sec.SecureConn, error) {
|
||||
sconn, _, err := sm.mux.SecureOutbound(ctx, insecure, p)
|
||||
return sconn, err
|
||||
}
|
||||
|
||||
func TestCommonProto(t *testing.T) {
|
||||
privA, idA := newPeer(t)
|
||||
privB, idB := newPeer(t)
|
||||
|
||||
var at, bt SSMuxer
|
||||
|
||||
atInsecure := insecure.NewWithIdentity(insecure.ID, idA, privA)
|
||||
btInsecure := insecure.NewWithIdentity(insecure.ID, idB, privB)
|
||||
at.AddTransport("/plaintext/1.0.0", atInsecure)
|
||||
bt.AddTransport("/plaintext/1.1.0", btInsecure)
|
||||
bt.AddTransport("/plaintext/1.0.0", btInsecure)
|
||||
|
||||
ln, err := net.Listen("tcp", "localhost:0")
|
||||
require.NoError(t, err)
|
||||
muxB := &TransportAdapter{mux: &bt}
|
||||
connChan := make(chan sec.SecureConn)
|
||||
go func() {
|
||||
conn, err := ln.Accept()
|
||||
require.NoError(t, err)
|
||||
c, err := muxB.SecureInbound(context.Background(), conn, idA)
|
||||
require.NoError(t, err)
|
||||
connChan <- c
|
||||
}()
|
||||
|
||||
muxA := &TransportAdapter{mux: &at}
|
||||
|
||||
cconn, err := net.Dial("tcp", ln.Addr().String())
|
||||
require.NoError(t, err)
|
||||
|
||||
cc, err := muxA.SecureOutbound(context.Background(), cconn, idB)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, cc.LocalPeer(), idA)
|
||||
require.Equal(t, cc.RemotePeer(), idB)
|
||||
_, err = cc.Write([]byte("foobar"))
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, cc.Close())
|
||||
|
||||
sc := <-connChan
|
||||
require.Equal(t, sc.LocalPeer(), idB)
|
||||
require.Equal(t, sc.RemotePeer(), idA)
|
||||
b, err := io.ReadAll(sc)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "foobar", string(b))
|
||||
}
|
||||
|
||||
func TestNoCommonProto(t *testing.T) {
|
||||
privA, idA := newPeer(t)
|
||||
privB, idB := newPeer(t)
|
||||
|
||||
var at, bt SSMuxer
|
||||
atInsecure := insecure.NewWithIdentity(insecure.ID, idA, privA)
|
||||
btInsecure := insecure.NewWithIdentity(insecure.ID, idB, privB)
|
||||
|
||||
at.AddTransport("/plaintext/1.0.0", atInsecure)
|
||||
bt.AddTransport("/plaintext/1.1.0", btInsecure)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
a, b := net.Pipe()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(2)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
defer a.Close()
|
||||
_, _, err := at.SecureInbound(ctx, a, "")
|
||||
if err == nil {
|
||||
t.Error("connection should have failed")
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
defer b.Close()
|
||||
_, _, err := bt.SecureOutbound(ctx, b, "peerA")
|
||||
if err == nil {
|
||||
t.Error("connection should have failed")
|
||||
}
|
||||
}()
|
||||
wg.Wait()
|
||||
}
|
@@ -12,11 +12,11 @@ import (
|
||||
"github.com/libp2p/go-libp2p/core/crypto"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/libp2p/go-libp2p/core/peerstore"
|
||||
"github.com/libp2p/go-libp2p/core/sec"
|
||||
"github.com/libp2p/go-libp2p/core/sec/insecure"
|
||||
"github.com/libp2p/go-libp2p/core/transport"
|
||||
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
|
||||
"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
|
||||
csms "github.com/libp2p/go-libp2p/p2p/net/conn-security-multistream"
|
||||
tptu "github.com/libp2p/go-libp2p/p2p/net/upgrader"
|
||||
quic "github.com/libp2p/go-libp2p/p2p/transport/quic"
|
||||
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
|
||||
@@ -75,10 +75,9 @@ func makeSwarm(t *testing.T) *Swarm {
|
||||
func makeUpgrader(t *testing.T, n *Swarm) transport.Upgrader {
|
||||
id := n.LocalPeer()
|
||||
pk := n.Peerstore().PrivKey(id)
|
||||
secMuxer := new(csms.SSMuxer)
|
||||
secMuxer.AddTransport(insecure.ID, insecure.NewWithIdentity(insecure.ID, id, pk))
|
||||
st := insecure.NewWithIdentity(insecure.ID, id, pk)
|
||||
|
||||
u, err := tptu.New(secMuxer, []tptu.StreamMuxer{{ID: "/yamux/1.0.0", Muxer: yamux.DefaultTransport}}, nil, nil, nil)
|
||||
u, err := tptu.New([]sec.SecureTransport{st}, []tptu.StreamMuxer{{ID: "/yamux/1.0.0", Muxer: yamux.DefaultTransport}}, nil, nil, nil)
|
||||
require.NoError(t, err)
|
||||
return u
|
||||
}
|
||||
|
@@ -12,11 +12,11 @@ import (
|
||||
"github.com/libp2p/go-libp2p/core/network"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/libp2p/go-libp2p/core/peerstore"
|
||||
"github.com/libp2p/go-libp2p/core/sec"
|
||||
"github.com/libp2p/go-libp2p/core/sec/insecure"
|
||||
"github.com/libp2p/go-libp2p/core/transport"
|
||||
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
|
||||
"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
|
||||
csms "github.com/libp2p/go-libp2p/p2p/net/conn-security-multistream"
|
||||
"github.com/libp2p/go-libp2p/p2p/net/swarm"
|
||||
tptu "github.com/libp2p/go-libp2p/p2p/net/upgrader"
|
||||
quic "github.com/libp2p/go-libp2p/p2p/transport/quic"
|
||||
@@ -101,10 +101,9 @@ func OptPeerPrivateKey(sk crypto.PrivKey) Option {
|
||||
func GenUpgrader(t *testing.T, n *swarm.Swarm, connGater connmgr.ConnectionGater, opts ...tptu.Option) transport.Upgrader {
|
||||
id := n.LocalPeer()
|
||||
pk := n.Peerstore().PrivKey(id)
|
||||
secMuxer := new(csms.SSMuxer)
|
||||
secMuxer.AddTransport(insecure.ID, insecure.NewWithIdentity(insecure.ID, id, pk))
|
||||
st := insecure.NewWithIdentity(insecure.ID, id, pk)
|
||||
|
||||
u, err := tptu.New(secMuxer, []tptu.StreamMuxer{{ID: "/yamux/1.0.0", Muxer: yamux.DefaultTransport}}, nil, nil, connGater, opts...)
|
||||
u, err := tptu.New([]sec.SecureTransport{st}, []tptu.StreamMuxer{{ID: "/yamux/1.0.0", Muxer: yamux.DefaultTransport}}, nil, nil, connGater, opts...)
|
||||
require.NoError(t, err)
|
||||
return u
|
||||
}
|
||||
|
@@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"net"
|
||||
"os"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
@@ -15,6 +14,7 @@ import (
|
||||
mocknetwork "github.com/libp2p/go-libp2p/core/network/mocks"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/libp2p/go-libp2p/core/sec"
|
||||
"github.com/libp2p/go-libp2p/core/sec/insecure"
|
||||
"github.com/libp2p/go-libp2p/core/transport"
|
||||
"github.com/libp2p/go-libp2p/p2p/net/upgrader"
|
||||
|
||||
@@ -24,22 +24,6 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type MuxAdapter struct {
|
||||
tpt sec.SecureTransport
|
||||
}
|
||||
|
||||
var _ sec.SecureMuxer = &MuxAdapter{}
|
||||
|
||||
func (mux *MuxAdapter) SecureInbound(ctx context.Context, insecure net.Conn, p peer.ID) (sec.SecureConn, bool, error) {
|
||||
sconn, err := mux.tpt.SecureInbound(ctx, insecure, p)
|
||||
return sconn, true, err
|
||||
}
|
||||
|
||||
func (mux *MuxAdapter) SecureOutbound(ctx context.Context, insecure net.Conn, p peer.ID) (sec.SecureConn, bool, error) {
|
||||
sconn, err := mux.tpt.SecureOutbound(ctx, insecure, p)
|
||||
return sconn, false, err
|
||||
}
|
||||
|
||||
func createListener(t *testing.T, u transport.Upgrader) transport.Listener {
|
||||
t.Helper()
|
||||
addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/0")
|
||||
@@ -401,3 +385,33 @@ func TestListenerResourceManagementDenied(t *testing.T) {
|
||||
require.NoError(t, ln.Close())
|
||||
<-done
|
||||
}
|
||||
|
||||
func TestNoCommonSecurityProto(t *testing.T) {
|
||||
idA, privA := newPeer(t)
|
||||
idB, privB := newPeer(t)
|
||||
atInsecure := insecure.NewWithIdentity("/plaintext1", idA, privA)
|
||||
btInsecure := insecure.NewWithIdentity("/plaintext2", idB, privB)
|
||||
|
||||
ua, err := upgrader.New([]sec.SecureTransport{atInsecure}, []upgrader.StreamMuxer{{ID: "negotiate", Muxer: &negotiatingMuxer{}}}, nil, nil, nil)
|
||||
require.NoError(t, err)
|
||||
ub, err := upgrader.New([]sec.SecureTransport{btInsecure}, []upgrader.StreamMuxer{{ID: "negotiate", Muxer: &negotiatingMuxer{}}}, nil, nil, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
ln := createListener(t, ua)
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer close(done)
|
||||
ln.Accept()
|
||||
}()
|
||||
|
||||
_, err = dial(t, ub, ln.Multiaddrs()[0], idA, &network.NullScope{})
|
||||
require.EqualError(t, err, "failed to negotiate security protocol: protocol not supported")
|
||||
select {
|
||||
case <-done:
|
||||
t.Fatal("didn't expect to accept a connection")
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
}
|
||||
|
||||
ln.Close()
|
||||
<-done
|
||||
}
|
||||
|
@@ -49,15 +49,17 @@ type StreamMuxer struct {
|
||||
// Upgrader is a multistream upgrader that can upgrade an underlying connection
|
||||
// to a full transport connection (secure and multiplexed).
|
||||
type upgrader struct {
|
||||
secure sec.SecureMuxer
|
||||
|
||||
psk ipnet.PSK
|
||||
connGater connmgr.ConnectionGater
|
||||
rcmgr network.ResourceManager
|
||||
|
||||
msmuxer *mss.MultistreamMuxer
|
||||
muxers []StreamMuxer
|
||||
muxerIDs []string
|
||||
muxerMuxer *mss.MultistreamMuxer
|
||||
muxers []StreamMuxer
|
||||
muxerIDs []string
|
||||
|
||||
security []sec.SecureTransport
|
||||
securityMuxer *mss.MultistreamMuxer
|
||||
securityIDs []string
|
||||
|
||||
// AcceptTimeout is the maximum duration an Accept is allowed to take.
|
||||
// This includes the time between accepting the raw network connection,
|
||||
@@ -69,15 +71,16 @@ type upgrader struct {
|
||||
|
||||
var _ transport.Upgrader = &upgrader{}
|
||||
|
||||
func New(secureMuxer sec.SecureMuxer, muxers []StreamMuxer, psk ipnet.PSK, rcmgr network.ResourceManager, connGater connmgr.ConnectionGater, opts ...Option) (transport.Upgrader, error) {
|
||||
func New(security []sec.SecureTransport, muxers []StreamMuxer, psk ipnet.PSK, rcmgr network.ResourceManager, connGater connmgr.ConnectionGater, opts ...Option) (transport.Upgrader, error) {
|
||||
u := &upgrader{
|
||||
secure: secureMuxer,
|
||||
acceptTimeout: defaultAcceptTimeout,
|
||||
rcmgr: rcmgr,
|
||||
connGater: connGater,
|
||||
psk: psk,
|
||||
msmuxer: mss.NewMultistreamMuxer(),
|
||||
muxerMuxer: mss.NewMultistreamMuxer(),
|
||||
muxers: muxers,
|
||||
security: security,
|
||||
securityMuxer: mss.NewMultistreamMuxer(),
|
||||
}
|
||||
for _, opt := range opts {
|
||||
if err := opt(u); err != nil {
|
||||
@@ -89,9 +92,14 @@ func New(secureMuxer sec.SecureMuxer, muxers []StreamMuxer, psk ipnet.PSK, rcmgr
|
||||
}
|
||||
u.muxerIDs = make([]string, 0, len(muxers))
|
||||
for _, m := range muxers {
|
||||
u.msmuxer.AddHandler(string(m.ID), nil)
|
||||
u.muxerMuxer.AddHandler(string(m.ID), nil)
|
||||
u.muxerIDs = append(u.muxerIDs, string(m.ID))
|
||||
}
|
||||
u.securityIDs = make([]string, 0, len(security))
|
||||
for _, s := range security {
|
||||
u.securityMuxer.AddHandler(string(s.ID()), nil)
|
||||
u.securityIDs = append(u.securityIDs, string(s.ID()))
|
||||
}
|
||||
return u, nil
|
||||
}
|
||||
|
||||
@@ -156,7 +164,7 @@ func (u *upgrader) upgrade(ctx context.Context, t transport.Transport, maconn ma
|
||||
log.Errorw("failed to close connection", "peer", p, "addr", maconn.RemoteMultiaddr(), "error", err)
|
||||
}
|
||||
return nil, fmt.Errorf("gater rejected connection with peer %s and addr %s with direction %d",
|
||||
sconn.RemotePeer().Pretty(), maconn.RemoteMultiaddr(), dir)
|
||||
sconn.RemotePeer(), maconn.RemoteMultiaddr(), dir)
|
||||
}
|
||||
// Only call SetPeer if it hasn't already been set -- this can happen when we don't know
|
||||
// the peer in advance and in some bug scenarios.
|
||||
@@ -167,7 +175,7 @@ func (u *upgrader) upgrade(ctx context.Context, t transport.Transport, maconn ma
|
||||
log.Errorw("failed to close connection", "peer", p, "addr", maconn.RemoteMultiaddr(), "error", err)
|
||||
}
|
||||
return nil, fmt.Errorf("resource manager connection with peer %s and addr %s with direction %d",
|
||||
sconn.RemotePeer().Pretty(), maconn.RemoteMultiaddr(), dir)
|
||||
sconn.RemotePeer(), maconn.RemoteMultiaddr(), dir)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -190,10 +198,19 @@ func (u *upgrader) upgrade(ctx context.Context, t transport.Transport, maconn ma
|
||||
}
|
||||
|
||||
func (u *upgrader) setupSecurity(ctx context.Context, conn net.Conn, p peer.ID, dir network.Direction) (sec.SecureConn, bool, error) {
|
||||
if dir == network.DirInbound {
|
||||
return u.secure.SecureInbound(ctx, conn, p)
|
||||
isServer := dir == network.DirInbound
|
||||
var st sec.SecureTransport
|
||||
var err error
|
||||
st, isServer, err = u.negotiateSecurity(ctx, conn, isServer)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
return u.secure.SecureOutbound(ctx, conn, p)
|
||||
if isServer {
|
||||
sconn, err := st.SecureInbound(ctx, conn, p)
|
||||
return sconn, true, err
|
||||
}
|
||||
sconn, err := st.SecureOutbound(ctx, conn, p)
|
||||
return sconn, false, err
|
||||
}
|
||||
|
||||
func (u *upgrader) negotiateMuxer(nc net.Conn, isServer bool) (*StreamMuxer, error) {
|
||||
@@ -203,7 +220,7 @@ func (u *upgrader) negotiateMuxer(nc net.Conn, isServer bool) (*StreamMuxer, err
|
||||
|
||||
var proto string
|
||||
if isServer {
|
||||
selected, _, err := u.msmuxer.Negotiate(nc)
|
||||
selected, _, err := u.muxerMuxer.Negotiate(nc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -279,3 +296,51 @@ func (u *upgrader) setupMuxer(ctx context.Context, conn sec.SecureConn, server b
|
||||
return "", nil, ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (u *upgrader) getSecurityByID(id string) sec.SecureTransport {
|
||||
for _, s := range u.security {
|
||||
if string(s.ID()) == id {
|
||||
return s
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (u *upgrader) negotiateSecurity(ctx context.Context, insecure net.Conn, server bool) (sec.SecureTransport, bool, error) {
|
||||
type result struct {
|
||||
proto string
|
||||
iamserver bool
|
||||
err error
|
||||
}
|
||||
|
||||
done := make(chan result, 1)
|
||||
go func() {
|
||||
if server {
|
||||
var r result
|
||||
r.iamserver = true
|
||||
r.proto, _, r.err = u.securityMuxer.Negotiate(insecure)
|
||||
done <- r
|
||||
return
|
||||
}
|
||||
var r result
|
||||
r.proto, r.iamserver, r.err = mss.SelectWithSimopenOrFail(u.securityIDs, insecure)
|
||||
done <- r
|
||||
}()
|
||||
|
||||
select {
|
||||
case r := <-done:
|
||||
if r.err != nil {
|
||||
return nil, false, r.err
|
||||
}
|
||||
if s := u.getSecurityByID(r.proto); s != nil {
|
||||
return s, r.iamserver, nil
|
||||
}
|
||||
return nil, false, fmt.Errorf("selected unknown security transport: %s", r.proto)
|
||||
case <-ctx.Done():
|
||||
// We *must* do this. We have outstanding work on the connection
|
||||
// and it's no longer safe to use.
|
||||
insecure.Close()
|
||||
<-done // wait to stop using the connection.
|
||||
return nil, false, ctx.Err()
|
||||
}
|
||||
}
|
||||
|
@@ -2,6 +2,7 @@ package upgrader_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"errors"
|
||||
"net"
|
||||
"testing"
|
||||
@@ -11,8 +12,8 @@ import (
|
||||
"github.com/libp2p/go-libp2p/core/network"
|
||||
mocknetwork "github.com/libp2p/go-libp2p/core/network/mocks"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/libp2p/go-libp2p/core/sec"
|
||||
"github.com/libp2p/go-libp2p/core/sec/insecure"
|
||||
"github.com/libp2p/go-libp2p/core/test"
|
||||
"github.com/libp2p/go-libp2p/core/transport"
|
||||
"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
|
||||
"github.com/libp2p/go-libp2p/p2p/net/upgrader"
|
||||
@@ -39,12 +40,18 @@ func createUpgraderWithOpts(t *testing.T, opts ...upgrader.Option) (peer.ID, tra
|
||||
return createUpgraderWithMuxers(t, []upgrader.StreamMuxer{{ID: "negotiate", Muxer: &negotiatingMuxer{}}}, nil, nil, opts...)
|
||||
}
|
||||
|
||||
func createUpgraderWithMuxers(t *testing.T, muxers []upgrader.StreamMuxer, rcmgr network.ResourceManager, connGater connmgr.ConnectionGater, opts ...upgrader.Option) (peer.ID, transport.Upgrader) {
|
||||
priv, _, err := test.RandTestKeyPair(crypto.Ed25519, 256)
|
||||
func newPeer(t *testing.T) (peer.ID, crypto.PrivKey) {
|
||||
t.Helper()
|
||||
priv, _, err := crypto.GenerateEd25519Key(rand.Reader)
|
||||
require.NoError(t, err)
|
||||
id, err := peer.IDFromPrivateKey(priv)
|
||||
require.NoError(t, err)
|
||||
u, err := upgrader.New(&MuxAdapter{tpt: insecure.NewWithIdentity(insecure.ID, id, priv)}, muxers, nil, rcmgr, connGater, opts...)
|
||||
return id, priv
|
||||
}
|
||||
|
||||
func createUpgraderWithMuxers(t *testing.T, muxers []upgrader.StreamMuxer, rcmgr network.ResourceManager, connGater connmgr.ConnectionGater, opts ...upgrader.Option) (peer.ID, transport.Upgrader) {
|
||||
id, priv := newPeer(t)
|
||||
u, err := upgrader.New([]sec.SecureTransport{insecure.NewWithIdentity(insecure.ID, id, priv)}, muxers, nil, rcmgr, connGater, opts...)
|
||||
require.NoError(t, err)
|
||||
return id, u
|
||||
}
|
||||
@@ -194,8 +201,4 @@ func TestOutboundResourceManagement(t *testing.T) {
|
||||
_, err := dial(t, dialUpgrader, ln.Multiaddrs()[0], id, connScope)
|
||||
require.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("blocked by the resource manager", func(t *testing.T) {
|
||||
|
||||
})
|
||||
}
|
||||
|
@@ -13,7 +13,6 @@ import (
|
||||
"github.com/libp2p/go-libp2p/core/sec/insecure"
|
||||
"github.com/libp2p/go-libp2p/core/transport"
|
||||
"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
|
||||
csms "github.com/libp2p/go-libp2p/p2p/net/conn-security-multistream"
|
||||
tptu "github.com/libp2p/go-libp2p/p2p/net/upgrader"
|
||||
ttransport "github.com/libp2p/go-libp2p/p2p/transport/testsuite"
|
||||
|
||||
@@ -148,13 +147,11 @@ func TestTcpTransportCantListenUtp(t *testing.T) {
|
||||
envReuseportVal = true
|
||||
}
|
||||
|
||||
func makeInsecureMuxer(t *testing.T) (peer.ID, sec.SecureMuxer) {
|
||||
func makeInsecureMuxer(t *testing.T) (peer.ID, []sec.SecureTransport) {
|
||||
t.Helper()
|
||||
priv, _, err := crypto.GenerateKeyPair(crypto.Ed25519, 256)
|
||||
require.NoError(t, err)
|
||||
id, err := peer.IDFromPrivateKey(priv)
|
||||
require.NoError(t, err)
|
||||
var secMuxer csms.SSMuxer
|
||||
secMuxer.AddTransport(insecure.ID, insecure.NewWithIdentity(insecure.ID, id, priv))
|
||||
return id, &secMuxer
|
||||
return id, []sec.SecureTransport{insecure.NewWithIdentity(insecure.ID, id, priv)}
|
||||
}
|
||||
|
@@ -27,7 +27,6 @@ import (
|
||||
"github.com/libp2p/go-libp2p/core/test"
|
||||
"github.com/libp2p/go-libp2p/core/transport"
|
||||
"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
|
||||
csms "github.com/libp2p/go-libp2p/p2p/net/conn-security-multistream"
|
||||
tptu "github.com/libp2p/go-libp2p/p2p/net/upgrader"
|
||||
"github.com/libp2p/go-libp2p/p2p/security/noise"
|
||||
ttransport "github.com/libp2p/go-libp2p/p2p/transport/testsuite"
|
||||
@@ -56,22 +55,16 @@ func newSecureUpgrader(t *testing.T) (peer.ID, transport.Upgrader) {
|
||||
return id, u
|
||||
}
|
||||
|
||||
func newInsecureMuxer(t *testing.T) (peer.ID, sec.SecureMuxer) {
|
||||
func newInsecureMuxer(t *testing.T) (peer.ID, []sec.SecureTransport) {
|
||||
t.Helper()
|
||||
priv, _, err := test.RandTestKeyPair(crypto.Ed25519, 256)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
require.NoError(t, err)
|
||||
id, err := peer.IDFromPrivateKey(priv)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
var secMuxer csms.SSMuxer
|
||||
secMuxer.AddTransport(insecure.ID, insecure.NewWithIdentity(insecure.ID, id, priv))
|
||||
return id, &secMuxer
|
||||
require.NoError(t, err)
|
||||
return id, []sec.SecureTransport{insecure.NewWithIdentity(insecure.ID, id, priv)}
|
||||
}
|
||||
|
||||
func newSecureMuxer(t *testing.T) (peer.ID, sec.SecureMuxer) {
|
||||
func newSecureMuxer(t *testing.T) (peer.ID, []sec.SecureTransport) {
|
||||
t.Helper()
|
||||
priv, _, err := test.RandTestKeyPair(crypto.Ed25519, 256)
|
||||
if err != nil {
|
||||
@@ -81,11 +74,9 @@ func newSecureMuxer(t *testing.T) (peer.ID, sec.SecureMuxer) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
var secMuxer csms.SSMuxer
|
||||
noiseTpt, err := noise.New(noise.ID, priv, nil)
|
||||
require.NoError(t, err)
|
||||
secMuxer.AddTransport(noise.ID, noiseTpt)
|
||||
return id, &secMuxer
|
||||
return id, []sec.SecureTransport{noiseTpt}
|
||||
}
|
||||
|
||||
func lastComponent(t *testing.T, a ma.Multiaddr) ma.Multiaddr {
|
||||
|
Reference in New Issue
Block a user