feat(simlibp2p): Simulated libp2p Networks (#3262)

This commit is contained in:
Marco Munizaga
2025-08-15 09:44:03 -07:00
committed by GitHub
parent 8e10f84961
commit 9f5945e189
12 changed files with 445 additions and 744 deletions

View File

@@ -9,6 +9,10 @@ runs:
shell: bash
# This matches only tests with "NoCover" in their test name to avoid running all tests again.
run: go test -tags nocover -run NoCover -v ./...
- name: Run synctests tests. These are tests that require go 1.24 and the experimental testing/synctest package
shell: bash
if: ${{ contains(matrix.go, '1.24') }}
run: go test -tags goexperiment.synctest -run "_synctest$" -v ./...
- name: Install testing tools
shell: bash
run: cd scripts/test_analysis && go install ./cmd/gotest2sql

1
go.mod
View File

@@ -32,6 +32,7 @@ require (
github.com/libp2p/go-reuseport v0.4.0
github.com/libp2p/go-yamux/v5 v5.0.1
github.com/libp2p/zeroconf/v2 v2.2.0
github.com/marcopolo/simnet v0.0.1
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd
github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b
github.com/mr-tron/base58 v1.2.0

2
go.sum
View File

@@ -122,6 +122,8 @@ github.com/libp2p/zeroconf/v2 v2.2.0 h1:Cup06Jv6u81HLhIj1KasuNM/RHHrJ8T7wOTS4+Tv
github.com/libp2p/zeroconf/v2 v2.2.0/go.mod h1:fuJqLnUwZTshS3U/bMRJ3+ow/v9oid1n0DmyYyNO1Xs=
github.com/lunixbochs/vtclean v1.0.0/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI=
github.com/mailru/easyjson v0.0.0-20190312143242-1de009706dbe/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/marcopolo/simnet v0.0.1 h1:rSMslhPz6q9IvJeFWDoMGxMIrlsbXau3NkuIXHGJxfg=
github.com/marcopolo/simnet v0.0.1/go.mod h1:WDaQkgLAjqDUEBAOXz22+1j6wXKfGlC5sD5XWt3ddOs=
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd h1:br0buuQ854V8u83wA0rVZ8ttrq5CpaPZdvrK0LP2lOk=
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd/go.mod h1:QuCEs1Nt24+FYQEqAAncTDPJIuGs+LxK1MCiFL25pMU=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=

View File

@@ -1,179 +0,0 @@
package simconn
import (
"errors"
"fmt"
"net"
"sync"
"time"
)
type PacketReciever interface {
RecvPacket(p Packet)
}
// PerfectRouter is a router that has no latency or jitter and can route to
// every node
type PerfectRouter struct {
mu sync.Mutex
nodes map[net.Addr]PacketReciever
}
// SendPacket implements Router.
func (r *PerfectRouter) SendPacket(p Packet) error {
r.mu.Lock()
defer r.mu.Unlock()
conn, ok := r.nodes[p.To]
if !ok {
return errors.New("unknown destination")
}
conn.RecvPacket(p)
return nil
}
func (r *PerfectRouter) AddNode(addr net.Addr, conn PacketReciever) {
r.mu.Lock()
defer r.mu.Unlock()
if r.nodes == nil {
r.nodes = make(map[net.Addr]PacketReciever)
}
r.nodes[addr] = conn
}
func (r *PerfectRouter) RemoveNode(addr net.Addr) {
delete(r.nodes, addr)
}
var _ Router = &PerfectRouter{}
type DelayedPacketReciever struct {
inner PacketReciever
delay time.Duration
}
func (r *DelayedPacketReciever) RecvPacket(p Packet) {
time.AfterFunc(r.delay, func() { r.inner.RecvPacket(p) })
}
type FixedLatencyRouter struct {
PerfectRouter
latency time.Duration
}
func (r *FixedLatencyRouter) SendPacket(p Packet) error {
return r.PerfectRouter.SendPacket(p)
}
func (r *FixedLatencyRouter) AddNode(addr net.Addr, conn PacketReciever) {
r.PerfectRouter.AddNode(addr, &DelayedPacketReciever{
inner: conn,
delay: r.latency,
})
}
var _ Router = &FixedLatencyRouter{}
type simpleNodeFirewall struct {
mu sync.Mutex
publiclyReachable bool
packetsOutTo map[string]struct{}
node *SimConn
}
func (f *simpleNodeFirewall) MarkPacketSentOut(p Packet) {
f.mu.Lock()
defer f.mu.Unlock()
if f.packetsOutTo == nil {
f.packetsOutTo = make(map[string]struct{})
}
f.packetsOutTo[p.To.String()] = struct{}{}
}
func (f *simpleNodeFirewall) IsPacketInAllowed(p Packet) bool {
f.mu.Lock()
defer f.mu.Unlock()
if f.publiclyReachable {
return true
}
_, ok := f.packetsOutTo[p.From.String()]
return ok
}
func (f *simpleNodeFirewall) String() string {
return fmt.Sprintf("public: %v, packetsOutTo: %v", f.publiclyReachable, f.packetsOutTo)
}
type SimpleFirewallRouter struct {
mu sync.Mutex
nodes map[string]*simpleNodeFirewall
}
func (r *SimpleFirewallRouter) String() string {
r.mu.Lock()
defer r.mu.Unlock()
nodes := make([]string, 0, len(r.nodes))
for _, node := range r.nodes {
nodes = append(nodes, node.String())
}
return fmt.Sprintf("%v", nodes)
}
func (r *SimpleFirewallRouter) SendPacket(p Packet) error {
r.mu.Lock()
defer r.mu.Unlock()
toNode, exists := r.nodes[p.To.String()]
if !exists {
return errors.New("unknown destination")
}
// Record that this node is sending a packet to the destination
fromNode, exists := r.nodes[p.From.String()]
if !exists {
return errors.New("unknown source")
}
fromNode.MarkPacketSentOut(p)
if !toNode.IsPacketInAllowed(p) {
return nil // Silently drop blocked packets
}
toNode.node.RecvPacket(p)
return nil
}
func (r *SimpleFirewallRouter) AddNode(addr net.Addr, conn *SimConn) {
r.mu.Lock()
defer r.mu.Unlock()
if r.nodes == nil {
r.nodes = make(map[string]*simpleNodeFirewall)
}
r.nodes[addr.String()] = &simpleNodeFirewall{
packetsOutTo: make(map[string]struct{}),
node: conn,
}
}
func (r *SimpleFirewallRouter) AddPubliclyReachableNode(addr net.Addr, conn *SimConn) {
r.mu.Lock()
defer r.mu.Unlock()
if r.nodes == nil {
r.nodes = make(map[string]*simpleNodeFirewall)
}
r.nodes[addr.String()] = &simpleNodeFirewall{
publiclyReachable: true,
node: conn,
}
}
func (r *SimpleFirewallRouter) RemoveNode(addr net.Addr) {
r.mu.Lock()
defer r.mu.Unlock()
if r.nodes == nil {
return
}
delete(r.nodes, addr.String())
}
var _ Router = &SimpleFirewallRouter{}

View File

@@ -1,218 +0,0 @@
package simconn
import (
"errors"
"net"
"slices"
"sync"
"sync/atomic"
"time"
)
var ErrDeadlineExceeded = errors.New("deadline exceeded")
type Router interface {
SendPacket(p Packet) error
}
type Packet struct {
To net.Addr
From net.Addr
buf []byte
}
type SimConn struct {
mu sync.Mutex
closed bool
closedChan chan struct{}
packetsSent atomic.Uint64
packetsRcvd atomic.Uint64
bytesSent atomic.Int64
bytesRcvd atomic.Int64
router Router
myAddr *net.UDPAddr
myLocalAddr net.Addr
packetsToRead chan Packet
readDeadline time.Time
writeDeadline time.Time
}
// NewSimConn creates a new simulated connection with the specified parameters
func NewSimConn(addr *net.UDPAddr, rtr Router) *SimConn {
return &SimConn{
router: rtr,
myAddr: addr,
packetsToRead: make(chan Packet, 512), // buffered channel to prevent blocking
closedChan: make(chan struct{}),
}
}
type ConnStats struct {
BytesSent int
BytesRcvd int
PacketsSent int
PacketsRcvd int
}
func (c *SimConn) Stats() ConnStats {
return ConnStats{
BytesSent: int(c.bytesSent.Load()),
BytesRcvd: int(c.bytesRcvd.Load()),
PacketsSent: int(c.packetsSent.Load()),
PacketsRcvd: int(c.packetsRcvd.Load()),
}
}
// SetLocalAddr only changes what `.LocalAddr()` returns.
// Packets will still come From the initially configured addr.
func (c *SimConn) SetLocalAddr(addr net.Addr) {
c.myLocalAddr = addr
}
func (c *SimConn) RecvPacket(p Packet) {
c.mu.Lock()
if c.closed {
c.mu.Unlock()
return
}
c.mu.Unlock()
c.packetsRcvd.Add(1)
c.bytesRcvd.Add(int64(len(p.buf)))
select {
case c.packetsToRead <- p:
default:
// drop the packet if the channel is full
}
}
var _ net.PacketConn = &SimConn{}
// Close implements net.PacketConn
func (c *SimConn) Close() error {
c.mu.Lock()
defer c.mu.Unlock()
if c.closed {
return nil
}
c.closed = true
close(c.closedChan)
return nil
}
// ReadFrom implements net.PacketConn
func (c *SimConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
c.mu.Lock()
if c.closed {
c.mu.Unlock()
return 0, nil, net.ErrClosed
}
deadline := c.readDeadline
c.mu.Unlock()
if !deadline.IsZero() && time.Now().After(deadline) {
return 0, nil, ErrDeadlineExceeded
}
var pkt Packet
if !deadline.IsZero() {
select {
case pkt = <-c.packetsToRead:
case <-time.After(time.Until(deadline)):
return 0, nil, ErrDeadlineExceeded
}
} else {
pkt = <-c.packetsToRead
}
n = copy(p, pkt.buf)
// if the provided buffer is not enough to read the whole packet, we drop
// the rest of the data. this is similar to what `recvfrom` does on Linux
// and macOS.
return n, pkt.From, nil
}
// WriteTo implements net.PacketConn
func (c *SimConn) WriteTo(p []byte, addr net.Addr) (n int, err error) {
c.mu.Lock()
if c.closed {
c.mu.Unlock()
return 0, net.ErrClosed
}
deadline := c.writeDeadline
c.mu.Unlock()
if !deadline.IsZero() && time.Now().After(deadline) {
return 0, ErrDeadlineExceeded
}
c.packetsSent.Add(1)
c.bytesSent.Add(int64(len(p)))
pkt := Packet{
From: c.myAddr,
To: addr,
buf: slices.Clone(p),
}
return len(p), c.router.SendPacket(pkt)
}
func (c *SimConn) UnicastAddr() net.Addr {
return c.myAddr
}
// LocalAddr implements net.PacketConn
func (c *SimConn) LocalAddr() net.Addr {
if c.myLocalAddr != nil {
return c.myLocalAddr
}
return c.myAddr
}
// SetDeadline implements net.PacketConn
func (c *SimConn) SetDeadline(t time.Time) error {
c.mu.Lock()
defer c.mu.Unlock()
c.readDeadline = t
c.writeDeadline = t
return nil
}
// SetReadDeadline implements net.PacketConn
func (c *SimConn) SetReadDeadline(t time.Time) error {
c.mu.Lock()
defer c.mu.Unlock()
c.readDeadline = t
return nil
}
// SetWriteDeadline implements net.PacketConn
func (c *SimConn) SetWriteDeadline(t time.Time) error {
c.mu.Lock()
defer c.mu.Unlock()
c.writeDeadline = t
return nil
}
func IntToPublicIPv4(n int) net.IP {
n += 1
// Avoid private IP ranges
b := make([]byte, 4)
b[0] = byte((n>>24)&0xFF | 1)
b[1] = byte((n >> 16) & 0xFF)
b[2] = byte((n >> 8) & 0xFF)
b[3] = byte(n & 0xFF)
ip := net.IPv4(b[0], b[1], b[2], b[3])
// Check and modify if it's in private ranges
if ip.IsPrivate() {
b[0] = 1 // Use 1.x.x.x as public range
}
return ip
}

View File

@@ -1,315 +0,0 @@
package simconn
import (
"bytes"
"net"
"sync"
"testing"
"testing/quick"
"time"
"github.com/stretchr/testify/require"
)
func TestSimConnBasicConnectivity(t *testing.T) {
router := &PerfectRouter{}
// Create two endpoints
addr1 := &net.UDPAddr{IP: IntToPublicIPv4(1), Port: 1234}
addr2 := &net.UDPAddr{IP: IntToPublicIPv4(2), Port: 1234}
conn1 := NewSimConn(addr1, router)
conn2 := NewSimConn(addr2, router)
router.AddNode(addr1, conn1)
router.AddNode(addr2, conn2)
// Test sending data from conn1 to conn2
testData := []byte("hello world")
n, err := conn1.WriteTo(testData, addr2)
require.NoError(t, err)
require.Equal(t, len(testData), n)
// Read data from conn2
buf := make([]byte, 1024)
n, addr, err := conn2.ReadFrom(buf)
require.NoError(t, err)
require.Equal(t, testData, buf[:n])
require.Equal(t, addr1, addr)
// Check stats
stats1 := conn1.Stats()
require.Equal(t, len(testData), stats1.BytesSent)
require.Equal(t, 1, stats1.PacketsSent)
stats2 := conn2.Stats()
require.Equal(t, len(testData), stats2.BytesRcvd)
require.Equal(t, 1, stats2.PacketsRcvd)
}
func TestSimConnDeadlines(t *testing.T) {
router := &PerfectRouter{}
addr1 := &net.UDPAddr{IP: IntToPublicIPv4(1), Port: 1234}
conn := NewSimConn(addr1, router)
router.AddNode(addr1, conn)
t.Run("read deadline", func(t *testing.T) {
deadline := time.Now().Add(10 * time.Millisecond)
err := conn.SetReadDeadline(deadline)
require.NoError(t, err)
buf := make([]byte, 1024)
_, _, err = conn.ReadFrom(buf)
require.ErrorIs(t, err, ErrDeadlineExceeded)
})
t.Run("write deadline", func(t *testing.T) {
deadline := time.Now().Add(-time.Second) // Already expired
err := conn.SetWriteDeadline(deadline)
require.NoError(t, err)
_, err = conn.WriteTo([]byte("test"), &net.UDPAddr{})
require.ErrorIs(t, err, ErrDeadlineExceeded)
})
}
func TestSimConnClose(t *testing.T) {
router := &PerfectRouter{}
addr1 := &net.UDPAddr{IP: IntToPublicIPv4(1), Port: 1234}
conn := NewSimConn(addr1, router)
router.AddNode(addr1, conn)
err := conn.Close()
require.NoError(t, err)
// Verify operations fail after close
_, err = conn.WriteTo([]byte("test"), addr1)
require.ErrorIs(t, err, net.ErrClosed)
buf := make([]byte, 1024)
_, _, err = conn.ReadFrom(buf)
require.ErrorIs(t, err, net.ErrClosed)
// Second close should not error
err = conn.Close()
require.NoError(t, err)
}
func TestSimConnLocalAddr(t *testing.T) {
router := &PerfectRouter{}
addr1 := &net.UDPAddr{IP: IntToPublicIPv4(1), Port: 1234}
conn := NewSimConn(addr1, router)
// Test default local address
require.Equal(t, addr1, conn.LocalAddr())
// Test setting custom local address
customAddr := &net.UDPAddr{IP: IntToPublicIPv4(3), Port: 5678}
conn.SetLocalAddr(customAddr)
require.Equal(t, customAddr, conn.LocalAddr())
}
func TestSimConnDeadlinesWithLatency(t *testing.T) {
router := &FixedLatencyRouter{
PerfectRouter: PerfectRouter{},
latency: 100 * time.Millisecond,
}
addr1 := &net.UDPAddr{IP: IntToPublicIPv4(1), Port: 1234}
addr2 := &net.UDPAddr{IP: IntToPublicIPv4(2), Port: 1234}
conn1 := NewSimConn(addr1, router)
conn2 := NewSimConn(addr2, router)
router.AddNode(addr1, conn1)
router.AddNode(addr2, conn2)
reset := func() {
router.RemoveNode(addr1)
router.RemoveNode(addr2)
conn1 = NewSimConn(addr1, router)
conn2 = NewSimConn(addr2, router)
router.AddNode(addr1, conn1)
router.AddNode(addr2, conn2)
}
t.Run("write succeeds within deadline", func(t *testing.T) {
deadline := time.Now().Add(200 * time.Millisecond)
err := conn1.SetWriteDeadline(deadline)
require.NoError(t, err)
n, err := conn1.WriteTo([]byte("test"), addr2)
require.NoError(t, err)
require.Equal(t, 4, n)
reset()
})
t.Run("write fails after past deadline", func(t *testing.T) {
deadline := time.Now().Add(-time.Second) // Already expired
err := conn1.SetWriteDeadline(deadline)
require.NoError(t, err)
_, err = conn1.WriteTo([]byte("test"), addr2)
require.ErrorIs(t, err, ErrDeadlineExceeded)
reset()
})
t.Run("read succeeds within deadline", func(t *testing.T) {
// Reset deadline and send a message
conn2.SetReadDeadline(time.Time{})
testData := []byte("hello")
deadline := time.Now().Add(200 * time.Millisecond)
conn1.SetWriteDeadline(deadline)
_, err := conn1.WriteTo(testData, addr2)
require.NoError(t, err)
// Set read deadline and try to read
deadline = time.Now().Add(200 * time.Millisecond)
err = conn2.SetReadDeadline(deadline)
require.NoError(t, err)
buf := make([]byte, 1024)
n, addr, err := conn2.ReadFrom(buf)
require.NoError(t, err)
require.Equal(t, addr1, addr)
require.Equal(t, testData, buf[:n])
reset()
})
t.Run("read fails after deadline", func(t *testing.T) {
defer reset()
// Set a short deadline
deadline := time.Now().Add(50 * time.Millisecond) // Less than router latency
err := conn2.SetReadDeadline(deadline)
require.NoError(t, err)
var wg sync.WaitGroup
defer wg.Wait()
wg.Add(1)
go func() {
defer wg.Done()
// Send data after setting deadline
_, err := conn1.WriteTo([]byte("test"), addr2)
require.NoError(t, err)
}()
// Read should fail due to deadline
buf := make([]byte, 1024)
_, _, err = conn2.ReadFrom(buf)
require.ErrorIs(t, err, ErrDeadlineExceeded)
})
}
func TestSimpleHolePunch(t *testing.T) {
router := &SimpleFirewallRouter{
nodes: make(map[string]*simpleNodeFirewall),
}
// Create two peers
addr1 := &net.UDPAddr{IP: IntToPublicIPv4(1), Port: 1234}
addr2 := &net.UDPAddr{IP: IntToPublicIPv4(2), Port: 1234}
peer1 := NewSimConn(addr1, router)
peer2 := NewSimConn(addr2, router)
router.AddNode(addr1, peer1)
router.AddNode(addr2, peer2)
reset := func() {
router.RemoveNode(addr1)
router.RemoveNode(addr2)
peer1 = NewSimConn(addr1, router)
peer2 = NewSimConn(addr2, router)
router.AddNode(addr1, peer1)
router.AddNode(addr2, peer2)
}
// Initially, direct communication between peer1 and peer2 should fail
t.Run("direct communication blocked initially", func(t *testing.T) {
_, err := peer1.WriteTo([]byte("direct message"), addr2)
require.NoError(t, err) // Write succeeds but packet is dropped
// Try to read from peer2
peer2.SetReadDeadline(time.Now().Add(50 * time.Millisecond))
buf := make([]byte, 1024)
_, _, err = peer2.ReadFrom(buf)
require.ErrorIs(t, err, ErrDeadlineExceeded)
reset()
})
holePunchMsg := []byte("hole punch")
// Simulate hole punching
t.Run("hole punch and direct communication", func(t *testing.T) {
// Both peers send packets to each other simultaneously
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
_, err := peer1.WriteTo(holePunchMsg, addr2)
require.NoError(t, err)
}()
go func() {
defer wg.Done()
_, err := peer2.WriteTo(holePunchMsg, addr1)
require.NoError(t, err)
}()
wg.Wait()
// Now direct communication should work both ways
t.Run("peer1 to peer2", func(t *testing.T) {
testMsg := []byte("direct message after hole punch")
_, err := peer1.WriteTo(testMsg, addr2)
require.NoError(t, err)
buf := make([]byte, 1024)
peer2.SetReadDeadline(time.Now().Add(100 * time.Millisecond))
n, addr, err := peer2.ReadFrom(buf)
require.NoError(t, err)
require.Equal(t, addr1, addr)
if bytes.Equal(buf[:n], holePunchMsg) {
// Read again to get the actual message
n, addr, err = peer2.ReadFrom(buf)
require.NoError(t, err)
require.Equal(t, addr1, addr)
}
require.Equal(t, string(testMsg), string(buf[:n]))
})
t.Run("peer2 to peer1", func(t *testing.T) {
testMsg := []byte("response from peer2")
_, err := peer2.WriteTo(testMsg, addr1)
require.NoError(t, err)
buf := make([]byte, 1024)
peer1.SetReadDeadline(time.Now().Add(100 * time.Millisecond))
n, addr, err := peer1.ReadFrom(buf)
require.NoError(t, err)
require.Equal(t, addr2, addr)
if bytes.Equal(buf[:n], holePunchMsg) {
// Read again to get the actual message
n, addr, err = peer1.ReadFrom(buf)
require.NoError(t, err)
require.Equal(t, addr2, addr)
}
require.Equal(t, string(testMsg), string(buf[:n]))
})
})
}
func TestPublicIP(t *testing.T) {
err := quick.Check(func(n int) bool {
ip := IntToPublicIPv4(n)
return !ip.IsPrivate()
}, nil)
require.NoError(t, err)
}

View File

@@ -15,13 +15,13 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/p2p/net/simconn"
relayv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"
"github.com/libp2p/go-libp2p/p2p/protocol/holepunch"
holepunch_pb "github.com/libp2p/go-libp2p/p2p/protocol/holepunch/pb"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
"github.com/libp2p/go-libp2p/p2p/transport/quicreuse"
"github.com/marcopolo/simnet"
"go.uber.org/fx"
"github.com/libp2p/go-msgio/pbio"
@@ -84,9 +84,9 @@ func (s *mockIDService) OwnObservedAddrs() []ma.Multiaddr {
}
func TestNoHolePunchIfDirectConnExists(t *testing.T) {
router := &simconn.SimpleFirewallRouter{}
router := &simnet.SimpleFirewallRouter{}
relay := MustNewHost(t,
quicSimConn(true, router),
quicSimnet(true, router),
libp2p.ListenAddrs(ma.StringCast("/ip4/1.2.0.1/udp/8000/quic-v1")),
libp2p.DisableRelay(),
libp2p.ResourceManager(&network.NullResourceManager{}),
@@ -99,14 +99,14 @@ func TestNoHolePunchIfDirectConnExists(t *testing.T) {
tr := &mockEventTracer{}
h1 := MustNewHost(t,
quicSimConn(false, router),
quicSimnet(false, router),
libp2p.EnableHolePunching(holepunch.DirectDialTimeout(100*time.Millisecond)),
libp2p.ListenAddrs(ma.StringCast("/ip4/2.2.0.1/udp/8000/quic-v1")),
libp2p.ResourceManager(&network.NullResourceManager{}),
)
h2 := MustNewHost(t,
quicSimConn(true, router),
quicSimnet(true, router),
libp2p.ListenAddrs(ma.StringCast("/ip4/2.2.0.2/udp/8001/quic-v1")),
libp2p.ResourceManager(&network.NullResourceManager{}),
libp2p.ForceReachabilityPublic(),
@@ -138,9 +138,9 @@ func TestNoHolePunchIfDirectConnExists(t *testing.T) {
}
func TestDirectDialWorks(t *testing.T) {
router := &simconn.SimpleFirewallRouter{}
router := &simnet.SimpleFirewallRouter{}
relay := MustNewHost(t,
quicSimConn(true, router),
quicSimnet(true, router),
libp2p.ListenAddrs(ma.StringCast("/ip4/1.2.0.1/udp/8000/quic-v1")),
libp2p.DisableRelay(),
libp2p.ResourceManager(&network.NullResourceManager{}),
@@ -154,7 +154,7 @@ func TestDirectDialWorks(t *testing.T) {
tr := &mockEventTracer{}
// h1 is public
h1 := MustNewHost(t,
quicSimConn(true, router),
quicSimnet(true, router),
libp2p.ForceReachabilityPublic(),
libp2p.EnableHolePunching(holepunch.DirectDialTimeout(100*time.Millisecond)),
libp2p.ListenAddrs(ma.StringCast("/ip4/2.2.0.1/udp/8000/quic-v1")),
@@ -162,7 +162,7 @@ func TestDirectDialWorks(t *testing.T) {
)
h2 := MustNewHost(t,
quicSimConn(false, router),
quicSimnet(false, router),
libp2p.ListenAddrs(ma.StringCast("/ip4/2.2.0.2/udp/8001/quic-v1")),
libp2p.ResourceManager(&network.NullResourceManager{}),
connectToRelay(&relay),
@@ -245,9 +245,9 @@ func TestEndToEndSimConnect(t *testing.T) {
h1tr := &mockEventTracer{}
h2tr := &mockEventTracer{}
router := &simconn.SimpleFirewallRouter{}
router := &simnet.SimpleFirewallRouter{}
relay := MustNewHost(t,
quicSimConn(true, router),
quicSimnet(true, router),
libp2p.ListenAddrs(ma.StringCast("/ip4/1.2.0.1/udp/8000/quic-v1")),
libp2p.DisableRelay(),
libp2p.ResourceManager(&network.NullResourceManager{}),
@@ -259,7 +259,7 @@ func TestEndToEndSimConnect(t *testing.T) {
)
h1 := MustNewHost(t,
quicSimConn(false, router),
quicSimnet(false, router),
libp2p.EnableHolePunching(holepunch.WithTracer(h1tr), holepunch.DirectDialTimeout(100*time.Millisecond), SetLegacyBehavior(useLegacyHolePunchingBehavior)),
libp2p.ListenAddrs(ma.StringCast("/ip4/2.2.0.1/udp/8000/quic-v1")),
libp2p.ResourceManager(&network.NullResourceManager{}),
@@ -267,7 +267,7 @@ func TestEndToEndSimConnect(t *testing.T) {
)
h2 := MustNewHost(t,
quicSimConn(false, router),
quicSimnet(false, router),
libp2p.ListenAddrs(ma.StringCast("/ip4/2.2.0.2/udp/8001/quic-v1")),
libp2p.ResourceManager(&network.NullResourceManager{}),
connectToRelay(&relay),
@@ -365,9 +365,9 @@ func TestFailuresOnInitiator(t *testing.T) {
defer func() { holepunch.StreamTimeout = cpy }()
}
router := &simconn.SimpleFirewallRouter{}
router := &simnet.SimpleFirewallRouter{}
relay := MustNewHost(t,
quicSimConn(true, router),
quicSimnet(true, router),
libp2p.ListenAddrs(ma.StringCast("/ip4/1.2.0.1/udp/8000/quic-v1")),
libp2p.DisableRelay(),
libp2p.ResourceManager(&network.NullResourceManager{}),
@@ -380,7 +380,7 @@ func TestFailuresOnInitiator(t *testing.T) {
// h1 does not have a holepunching service because we'll mock the holepunching stream handler below.
h1 := MustNewHost(t,
quicSimConn(false, router),
quicSimnet(false, router),
libp2p.ForceReachabilityPrivate(),
libp2p.ListenAddrs(ma.StringCast("/ip4/2.2.0.1/udp/8000/quic-v1")),
libp2p.ResourceManager(&network.NullResourceManager{}),
@@ -388,7 +388,7 @@ func TestFailuresOnInitiator(t *testing.T) {
)
h2 := MustNewHost(t,
quicSimConn(false, router),
quicSimnet(false, router),
libp2p.ListenAddrs(ma.StringCast("/ip4/2.2.0.2/udp/8001/quic-v1")),
libp2p.ResourceManager(&network.NullResourceManager{}),
connectToRelay(&relay),
@@ -515,9 +515,9 @@ func TestFailuresOnResponder(t *testing.T) {
opts = append(opts, holepunch.WithAddrFilter(f))
}
router := &simconn.SimpleFirewallRouter{}
router := &simnet.SimpleFirewallRouter{}
relay := MustNewHost(t,
quicSimConn(true, router),
quicSimnet(true, router),
libp2p.ListenAddrs(ma.StringCast("/ip4/1.2.0.1/udp/8000/quic-v1")),
libp2p.DisableRelay(),
libp2p.ResourceManager(&network.NullResourceManager{}),
@@ -528,7 +528,7 @@ func TestFailuresOnResponder(t *testing.T) {
})),
)
h1 := MustNewHost(t,
quicSimConn(false, router),
quicSimnet(false, router),
libp2p.EnableHolePunching(opts...),
libp2p.ListenAddrs(ma.StringCast("/ip4/2.2.0.1/udp/8000/quic-v1")),
libp2p.ResourceManager(&network.NullResourceManager{}),
@@ -537,7 +537,7 @@ func TestFailuresOnResponder(t *testing.T) {
)
h2 := MustNewHost(t,
quicSimConn(false, router),
quicSimnet(false, router),
libp2p.ListenAddrs(ma.StringCast("/ip4/2.2.0.2/udp/8001/quic-v1")),
libp2p.ResourceManager(&network.NullResourceManager{}),
connectToRelay(&relay),
@@ -634,7 +634,7 @@ func (m *MockSourceIPSelector) PreferredSourceIPForDestination(_ *net.UDPAddr) (
return *m.ip.Load(), nil
}
func quicSimConn(isPubliclyReachably bool, router *simconn.SimpleFirewallRouter) libp2p.Option {
func quicSimnet(isPubliclyReachably bool, router *simnet.SimpleFirewallRouter) libp2p.Option {
m := &MockSourceIPSelector{}
return libp2p.QUICReuse(
quicreuse.NewConnManager,
@@ -643,12 +643,10 @@ func quicSimConn(isPubliclyReachably bool, router *simconn.SimpleFirewallRouter)
}),
quicreuse.OverrideListenUDP(func(_ string, address *net.UDPAddr) (net.PacketConn, error) {
m.ip.Store(&address.IP)
c := simconn.NewSimConn(address, router)
if isPubliclyReachably {
router.AddPubliclyReachableNode(address, c)
} else {
router.AddNode(address, c)
router.SetAddrPubliclyReachable(address)
}
c := simnet.NewSimConn(address, router)
return c, nil
}))
}
@@ -695,9 +693,9 @@ func TestEndToEndSimConnectQUICReuse(t *testing.T) {
h1tr := &mockEventTracer{}
h2tr := &mockEventTracer{}
router := &simconn.SimpleFirewallRouter{}
router := &simnet.SimpleFirewallRouter{}
relay := MustNewHost(t,
quicSimConn(true, router),
quicSimnet(true, router),
libp2p.ListenAddrs(ma.StringCast("/ip4/1.2.0.1/udp/8000/quic-v1")),
libp2p.DisableRelay(),
libp2p.ResourceManager(&network.NullResourceManager{}),
@@ -732,7 +730,7 @@ func TestEndToEndSimConnectQUICReuse(t *testing.T) {
}
h1 := MustNewHost(t,
quicSimConn(false, router),
quicSimnet(false, router),
libp2p.EnableHolePunching(holepunch.WithTracer(h1tr), holepunch.DirectDialTimeout(100*time.Millisecond)),
libp2p.ListenAddrs(ma.StringCast("/ip4/2.2.0.1/udp/8001/quic-v1/webtransport")),
libp2p.ResourceManager(&network.NullResourceManager{}),
@@ -748,7 +746,7 @@ func TestEndToEndSimConnectQUICReuse(t *testing.T) {
)
h2 := MustNewHost(t,
quicSimConn(false, router),
quicSimnet(false, router),
libp2p.ListenAddrs(
ma.StringCast("/ip4/2.2.0.2/udp/8001/quic-v1/webtransport"),
),

View File

@@ -68,6 +68,8 @@ type ConnManager struct {
connContext connContextFunc
verifySourceAddress func(addr net.Addr) bool
qlogTracerDir string
}
type quicListenerEntry struct {
@@ -148,8 +150,14 @@ func (c *ConnManager) getTracer() func(context.Context, quiclogging.Perspective,
}
}
var tracer *quiclogging.ConnectionTracer
if qlogTracerDir != "" {
tracer = qloggerForDir(qlogTracerDir, p, ci)
var tracerDir = c.qlogTracerDir
if tracerDir == "" {
// Fallback to the global qlogTracerDir
tracerDir = qlogTracerDir
}
if tracerDir != "" {
tracer = qloggerForDir(tracerDir, p, ci)
if promTracer != nil {
tracer = quiclogging.NewMultiplexedConnectionTracer(promTracer,
tracer)

View File

@@ -27,6 +27,13 @@ func OverrideSourceIPSelector(f func() (SourceIPSelector, error)) Option {
}
}
func WithQlogTracerDir(dir string) Option {
return func(m *ConnManager) error {
m.qlogTracerDir = dir
return nil
}
}
func DisableReuseport() Option {
return func(m *ConnManager) error {
m.enableReuseport = false

View File

@@ -118,6 +118,8 @@ github.com/libp2p/go-yamux/v5 v5.0.1 h1:f0WoX/bEF2E8SbE4c/k1Mo+/9z0O4oC/hWEA+nfY
github.com/libp2p/go-yamux/v5 v5.0.1/go.mod h1:en+3cdX51U0ZslwRdRLrvQsdayFt3TSUKvBGErzpWbU=
github.com/lunixbochs/vtclean v1.0.0/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI=
github.com/mailru/easyjson v0.0.0-20190312143242-1de009706dbe/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/marcopolo/simnet v0.0.1 h1:rSMslhPz6q9IvJeFWDoMGxMIrlsbXau3NkuIXHGJxfg=
github.com/marcopolo/simnet v0.0.1/go.mod h1:WDaQkgLAjqDUEBAOXz22+1j6wXKfGlC5sD5XWt3ddOs=
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd h1:br0buuQ854V8u83wA0rVZ8ttrq5CpaPZdvrK0LP2lOk=
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd/go.mod h1:QuCEs1Nt24+FYQEqAAncTDPJIuGs+LxK1MCiFL25pMU=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=

258
x/simlibp2p/libp2p.go Normal file
View File

@@ -0,0 +1,258 @@
package simconnlibp2p
import (
"crypto/rand"
"fmt"
"net"
"sync/atomic"
"testing"
"time"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/config"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
blankhost "github.com/libp2p/go-libp2p/p2p/host/blank"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
libp2pquic "github.com/libp2p/go-libp2p/p2p/transport/quic"
"github.com/libp2p/go-libp2p/p2p/transport/quicreuse"
"github.com/marcopolo/simnet"
"github.com/multiformats/go-multiaddr"
"github.com/quic-go/quic-go"
"github.com/stretchr/testify/require"
"go.uber.org/fx"
)
func MustNewHost(t *testing.T, opts ...libp2p.Option) host.Host {
t.Helper()
h, err := libp2p.New(opts...)
require.NoError(t, err)
return h
}
type MockSourceIPSelector struct {
ip atomic.Pointer[net.IP]
}
func (m *MockSourceIPSelector) PreferredSourceIPForDestination(_ *net.UDPAddr) (net.IP, error) {
return *m.ip.Load(), nil
}
const OneMbps = 1_000_000
func QUICSimnet(simnet *simnet.Simnet, linkSettings simnet.NodeBiDiLinkSettings, quicReuseOpts ...quicreuse.Option) libp2p.Option {
m := &MockSourceIPSelector{}
quicReuseOpts = append(quicReuseOpts,
quicreuse.OverrideSourceIPSelector(func() (quicreuse.SourceIPSelector, error) {
return m, nil
}),
quicreuse.OverrideListenUDP(func(_ string, address *net.UDPAddr) (net.PacketConn, error) {
m.ip.Store(&address.IP)
c := simnet.NewEndpoint(address, linkSettings)
return c, nil
}))
return libp2p.QUICReuse(
func(l fx.Lifecycle, statelessResetKey quic.StatelessResetKey, tokenKey quic.TokenGeneratorKey, opts ...quicreuse.Option) (*quicreuse.ConnManager, error) {
cm, err := quicreuse.NewConnManager(statelessResetKey, tokenKey, opts...)
if err != nil {
return nil, err
}
l.Append(fx.StopHook(func() error {
// When we pass in our own conn manager, we need to close it manually (??)
// TODO: this seems like a bug
return cm.Close()
}))
return cm, nil
}, quicReuseOpts...)
}
type wrappedHost struct {
blankhost.BlankHost
ps peerstore.Peerstore
quicCM *quicreuse.ConnManager
idService identify.IDService
connMgr *connmgr.BasicConnMgr
}
func (h *wrappedHost) Close() error {
h.BlankHost.Close()
h.ps.Close()
h.quicCM.Close()
h.idService.Close()
h.connMgr.Close()
return nil
}
type BlankHostOpts struct {
ConnMgr *connmgr.BasicConnMgr
listenMultiaddr multiaddr.Multiaddr
simnet *simnet.Simnet
linkSettings simnet.NodeBiDiLinkSettings
quicReuseOpts []quicreuse.Option
}
func newBlankHost(opts BlankHostOpts) (*wrappedHost, error) {
priv, _, err := crypto.GenerateEd25519Key(rand.Reader)
if err != nil {
return nil, err
}
id, err := peer.IDFromPrivateKey(priv)
if err != nil {
return nil, err
}
ps, err := pstoremem.NewPeerstore()
if err != nil {
return nil, err
}
ps.AddPrivKey(id, priv)
eb := eventbus.NewBus()
swarm, err := swarm.NewSwarm(id, ps, eb)
if err != nil {
return nil, err
}
statelessResetKey, err := config.PrivKeyToStatelessResetKey(priv)
if err != nil {
return nil, err
}
tokenGeneratorKey, err := config.PrivKeyToTokenGeneratorKey(priv)
if err != nil {
return nil, err
}
m := &MockSourceIPSelector{}
quicReuseOpts := append(opts.quicReuseOpts,
quicreuse.OverrideSourceIPSelector(func() (quicreuse.SourceIPSelector, error) {
return m, nil
}),
quicreuse.OverrideListenUDP(func(_ string, address *net.UDPAddr) (net.PacketConn, error) {
m.ip.Store(&address.IP)
c := opts.simnet.NewEndpoint(address, opts.linkSettings)
return c, nil
}),
)
quicCM, err := quicreuse.NewConnManager(statelessResetKey, tokenGeneratorKey, quicReuseOpts...)
if err != nil {
return nil, err
}
quicTr, err := libp2pquic.NewTransport(priv, quicCM, nil, nil, &network.NullResourceManager{})
if err != nil {
return nil, err
}
err = swarm.AddTransport(quicTr)
if err != nil {
return nil, err
}
err = swarm.Listen(opts.listenMultiaddr)
if err != nil {
return nil, err
}
var cm *connmgr.BasicConnMgr
if opts.ConnMgr == nil {
cm, err = connmgr.NewConnManager(100, 200, connmgr.WithGracePeriod(time.Second*10))
if err != nil {
return nil, err
}
} else {
cm = opts.ConnMgr
}
host := blankhost.NewBlankHost(swarm, blankhost.WithEventBus(eb), blankhost.WithConnectionManager(cm))
idService, err := identify.NewIDService(host, identify.DisableObservedAddrManager())
if err != nil {
return nil, err
}
idService.Start()
return &wrappedHost{
BlankHost: *host,
ps: ps,
quicCM: quicCM,
idService: idService,
connMgr: cm,
}, nil
}
type NodeLinkSettingsAndCount struct {
LinkSettings simnet.NodeBiDiLinkSettings
Count int
}
type HostAndIdx struct {
Host host.Host
Idx int
}
type SimpleLibp2pNetworkMeta struct {
Nodes []host.Host
AddrToNode map[string]HostAndIdx
}
type NetworkSettings struct {
UseBlankHost bool
QUICReuseOptsForHostIdx func(idx int) []quicreuse.Option
BlankHostOptsForHostIdx func(idx int) BlankHostOpts
}
func SimpleLibp2pNetwork(linkSettings []NodeLinkSettingsAndCount, networkSettings NetworkSettings) (*simnet.Simnet, *SimpleLibp2pNetworkMeta, error) {
nw := &simnet.Simnet{}
meta := &SimpleLibp2pNetworkMeta{
AddrToNode: make(map[string]HostAndIdx),
}
for _, l := range linkSettings {
for i := 0; i < l.Count; i++ {
idx := len(meta.Nodes)
ip := simnet.IntToPublicIPv4(idx)
addr := fmt.Sprintf("/ip4/%s/udp/8000/quic-v1", ip)
var h host.Host
var err error
var quicReuseOpts []quicreuse.Option
if networkSettings.QUICReuseOptsForHostIdx != nil {
quicReuseOpts = networkSettings.QUICReuseOptsForHostIdx(idx)
}
if networkSettings.UseBlankHost {
var opts BlankHostOpts
if networkSettings.BlankHostOptsForHostIdx != nil {
opts = networkSettings.BlankHostOptsForHostIdx(idx)
}
h, err = newBlankHost(BlankHostOpts{
listenMultiaddr: multiaddr.StringCast(addr),
simnet: nw,
linkSettings: l.LinkSettings,
quicReuseOpts: quicReuseOpts,
ConnMgr: opts.ConnMgr,
})
} else {
h, err = libp2p.New(
libp2p.ListenAddrStrings(addr),
QUICSimnet(nw, l.LinkSettings, quicReuseOpts...),
// TODO: Currently using identify address discovery stalls
// synctest
libp2p.DisableIdentifyAddressDiscovery(),
libp2p.ResourceManager(&network.NullResourceManager{}),
)
}
if err != nil {
return nil, nil, err
}
meta.Nodes = append(meta.Nodes, h)
meta.AddrToNode[addr] = HostAndIdx{Host: h, Idx: idx}
}
}
return nw, meta, nil
}

View File

@@ -0,0 +1,133 @@
//go:build goexperiment.synctest
package simconnlibp2p_test
import (
"context"
"math/rand"
"testing"
"time"
"testing/synctest"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
simlibp2p "github.com/libp2p/go-libp2p/x/simlibp2p"
"github.com/marcopolo/simnet"
"github.com/stretchr/testify/require"
)
func TestSimpleLibp2pNetwork_synctest(t *testing.T) {
synctest.Run(func() {
latency := 10 * time.Millisecond
network, meta, err := simlibp2p.SimpleLibp2pNetwork([]simlibp2p.NodeLinkSettingsAndCount{
{LinkSettings: simnet.NodeBiDiLinkSettings{
Downlink: simnet.LinkSettings{BitsPerSecond: 20 * simlibp2p.OneMbps, Latency: latency / 2}, // Divide by two since this is latency for each direction
Uplink: simnet.LinkSettings{BitsPerSecond: 20 * simlibp2p.OneMbps, Latency: latency / 2},
}, Count: 100},
}, simlibp2p.NetworkSettings{})
require.NoError(t, err)
network.Start()
defer network.Close()
defer func() {
for _, node := range meta.Nodes {
node.Close()
}
}()
// Test random nodes can ping each other
const numQueries = 100
for range numQueries {
i := rand.Intn(len(meta.Nodes))
j := rand.Intn(len(meta.Nodes))
for i == j {
j = rand.Intn(len(meta.Nodes))
}
h1 := meta.Nodes[i]
h2 := meta.Nodes[j]
t.Logf("connecting %s <-> %s", h1.ID(), h2.ID())
err := h1.Connect(context.Background(), peer.AddrInfo{
ID: h2.ID(),
Addrs: h2.Addrs(),
})
require.NoError(t, err)
pingA := ping.NewPingService(h1)
ping.NewPingService(h2)
time.Sleep(1 * time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
t.Logf("pinging %s <-> %s", h1.ID(), h2.ID())
res := pingA.Ping(ctx, meta.Nodes[j].ID())
result := <-res
t.Logf("pinged %s <-> %s", h1.ID(), h2.ID())
require.NoError(t, result.Error)
t.Logf("ping: (%d) <-> (%d): %v", i, j, result.RTT)
expectedLatency := 20 * time.Millisecond // RTT is the sum of the latency of the two links
percentDiff := float64(result.RTT-expectedLatency) / float64(expectedLatency)
if percentDiff > 0.20 {
t.Fatalf("latency is wrong: %v. percent off: %v", result.RTT, percentDiff)
}
}
})
}
func TestSimpleSimNetPing_synctest(t *testing.T) {
synctest.Run(func() {
router := &simnet.Simnet{}
const bandwidth = 10 * simlibp2p.OneMbps
const latency = 10 * time.Millisecond
linkSettings := simnet.NodeBiDiLinkSettings{
Downlink: simnet.LinkSettings{
BitsPerSecond: bandwidth,
Latency: latency / 2,
},
Uplink: simnet.LinkSettings{
BitsPerSecond: bandwidth,
Latency: latency / 2,
},
}
hostA := simlibp2p.MustNewHost(t,
libp2p.ListenAddrStrings("/ip4/1.0.0.1/udp/8000/quic-v1"),
libp2p.DisableIdentifyAddressDiscovery(),
simlibp2p.QUICSimnet(router, linkSettings),
)
hostB := simlibp2p.MustNewHost(t,
libp2p.ListenAddrStrings("/ip4/1.0.0.2/udp/8000/quic-v1"),
libp2p.DisableIdentifyAddressDiscovery(),
simlibp2p.QUICSimnet(router, linkSettings),
)
err := router.Start()
require.NoError(t, err)
defer router.Close()
defer hostA.Close()
defer hostB.Close()
err = hostA.Connect(context.Background(), peer.AddrInfo{
ID: hostB.ID(),
Addrs: hostB.Addrs(),
})
require.NoError(t, err)
pingA := ping.NewPingService(hostA)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
res := pingA.Ping(ctx, hostB.ID())
result := <-res
require.NoError(t, result.Error)
t.Logf("pingA -> pingB: %v", result.RTT)
expectedLatency := latency * 2 // RTT is the sum of the latency of the two links
percentDiff := float64(result.RTT-expectedLatency) / float64(expectedLatency)
if percentDiff > 0.20 {
t.Fatalf("latency is wrong: %v. percent off: %v", result.RTT, percentDiff)
}
})
}