mirror of
https://github.com/libp2p/go-libp2p.git
synced 2025-10-18 22:14:49 +08:00
587 lines
16 KiB
Go
587 lines
16 KiB
Go
package swarm
|
|
|
|
import (
|
|
"bytes"
|
|
"errors"
|
|
"fmt"
|
|
"sort"
|
|
"sync"
|
|
"time"
|
|
|
|
lgbl "github.com/ipfs/go-libp2p/loggables"
|
|
conn "github.com/ipfs/go-libp2p/p2p/net/conn"
|
|
addrutil "github.com/ipfs/go-libp2p/p2p/net/swarm/addr"
|
|
peer "github.com/ipfs/go-libp2p/p2p/peer"
|
|
"gx/ipfs/QmYtzQmUwPFGxjCXctJ8e6GXS8sYfoXy2pdeMbS5SFWqRi/go-multiaddr-net"
|
|
|
|
ma "gx/ipfs/QmR3JkmZBKYXgNMNsNZawm914455Qof3PEopwuVSeXG7aV/go-multiaddr"
|
|
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
|
|
)
|
|
|
|
// Diagram of dial sync:
|
|
//
|
|
// many callers of Dial() synched w. dials many addrs results to callers
|
|
// ----------------------\ dialsync use earliest /--------------
|
|
// -----------------------\ |----------\ /----------------
|
|
// ------------------------>------------<------- >---------<-----------------
|
|
// -----------------------| \----x \----------------
|
|
// ----------------------| \-----x \---------------
|
|
// any may fail if no addr at end
|
|
// retry dialAttempt x
|
|
|
|
var (
|
|
ErrDialBackoff = errors.New("dial backoff")
|
|
ErrDialFailed = errors.New("dial attempt failed")
|
|
ErrDialToSelf = errors.New("dial to self attempted")
|
|
)
|
|
|
|
// dialAttempts governs how many times a goroutine will try to dial a given peer.
|
|
// Note: this is down to one, as we have _too many dials_ atm. To add back in,
|
|
// add loop back in Dial(.)
|
|
const dialAttempts = 1
|
|
|
|
// number of concurrent outbound dials over transports that consume file descriptors
|
|
const concurrentFdDials = 160
|
|
|
|
// DialTimeout is the amount of time each dial attempt has. We can think about making
|
|
// this larger down the road, or putting more granular timeouts (i.e. within each
|
|
// subcomponent of Dial)
|
|
var DialTimeout time.Duration = time.Second * 10
|
|
|
|
// dialsync is a small object that helps manage ongoing dials.
|
|
// this way, if we receive many simultaneous dial requests, one
|
|
// can do its thing, while the rest wait.
|
|
//
|
|
// this interface is so would-be dialers can just:
|
|
//
|
|
// for {
|
|
// c := findConnectionToPeer(peer)
|
|
// if c != nil {
|
|
// return c
|
|
// }
|
|
//
|
|
// // ok, no connections. should we dial?
|
|
// if ok, wait := dialsync.Lock(peer); !ok {
|
|
// <-wait // can optionally wait
|
|
// continue
|
|
// }
|
|
// defer dialsync.Unlock(peer)
|
|
//
|
|
// c := actuallyDial(peer)
|
|
// return c
|
|
// }
|
|
//
|
|
type dialsync struct {
|
|
// ongoing is a map of tickets for the current peers being dialed.
|
|
// this way, we dont kick off N dials simultaneously.
|
|
ongoing map[peer.ID]chan struct{}
|
|
lock sync.Mutex
|
|
}
|
|
|
|
// Lock governs the beginning of a dial attempt.
|
|
// If there are no ongoing dials, it returns true, and the client is now
|
|
// scheduled to dial. Every other goroutine that calls startDial -- with
|
|
//the same dst -- will block until client is done. The client MUST call
|
|
// ds.Unlock(p) when it is done, to unblock the other callers.
|
|
// The client is not reponsible for achieving a successful dial, only for
|
|
// reporting the end of the attempt (calling ds.Unlock(p)).
|
|
//
|
|
// see the example below `dialsync`
|
|
func (ds *dialsync) Lock(dst peer.ID) (bool, chan struct{}) {
|
|
ds.lock.Lock()
|
|
if ds.ongoing == nil { // init if not ready
|
|
ds.ongoing = make(map[peer.ID]chan struct{})
|
|
}
|
|
wait, found := ds.ongoing[dst]
|
|
if !found {
|
|
ds.ongoing[dst] = make(chan struct{})
|
|
}
|
|
ds.lock.Unlock()
|
|
|
|
if found {
|
|
return false, wait
|
|
}
|
|
|
|
// ok! you're signed up to dial!
|
|
return true, nil
|
|
}
|
|
|
|
// Unlock releases waiters to a dial attempt. see Lock.
|
|
// if Unlock(p) is called without calling Lock(p) first, Unlock panics.
|
|
func (ds *dialsync) Unlock(dst peer.ID) {
|
|
ds.lock.Lock()
|
|
wait, found := ds.ongoing[dst]
|
|
if !found {
|
|
panic("called dialDone with no ongoing dials to peer: " + dst.Pretty())
|
|
}
|
|
|
|
delete(ds.ongoing, dst) // remove ongoing dial
|
|
close(wait) // release everyone else
|
|
ds.lock.Unlock()
|
|
}
|
|
|
|
// dialbackoff is a struct used to avoid over-dialing the same, dead peers.
|
|
// Whenever we totally time out on a peer (all three attempts), we add them
|
|
// to dialbackoff. Then, whenevers goroutines would _wait_ (dialsync), they
|
|
// check dialbackoff. If it's there, they don't wait and exit promptly with
|
|
// an error. (the single goroutine that is actually dialing continues to
|
|
// dial). If a dial is successful, the peer is removed from backoff.
|
|
// Example:
|
|
//
|
|
// for {
|
|
// if ok, wait := dialsync.Lock(p); !ok {
|
|
// if backoff.Backoff(p) {
|
|
// return errDialFailed
|
|
// }
|
|
// <-wait
|
|
// continue
|
|
// }
|
|
// defer dialsync.Unlock(p)
|
|
// c, err := actuallyDial(p)
|
|
// if err != nil {
|
|
// dialbackoff.AddBackoff(p)
|
|
// continue
|
|
// }
|
|
// dialbackoff.Clear(p)
|
|
// }
|
|
//
|
|
|
|
type dialbackoff struct {
|
|
entries map[peer.ID]*backoffPeer
|
|
lock sync.RWMutex
|
|
}
|
|
|
|
type backoffPeer struct {
|
|
tries int
|
|
until time.Time
|
|
}
|
|
|
|
func (db *dialbackoff) init() {
|
|
if db.entries == nil {
|
|
db.entries = make(map[peer.ID]*backoffPeer)
|
|
}
|
|
}
|
|
|
|
// Backoff returns whether the client should backoff from dialing
|
|
// peer p
|
|
func (db *dialbackoff) Backoff(p peer.ID) (backoff bool) {
|
|
db.lock.Lock()
|
|
defer db.lock.Unlock()
|
|
db.init()
|
|
bp, found := db.entries[p]
|
|
if found && time.Now().Before(bp.until) {
|
|
return true
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
const baseBackoffTime = time.Second * 5
|
|
const maxBackoffTime = time.Minute * 5
|
|
|
|
// AddBackoff lets other nodes know that we've entered backoff with
|
|
// peer p, so dialers should not wait unnecessarily. We still will
|
|
// attempt to dial with one goroutine, in case we get through.
|
|
func (db *dialbackoff) AddBackoff(p peer.ID) {
|
|
db.lock.Lock()
|
|
defer db.lock.Unlock()
|
|
db.init()
|
|
bp, ok := db.entries[p]
|
|
if !ok {
|
|
db.entries[p] = &backoffPeer{
|
|
tries: 1,
|
|
until: time.Now().Add(baseBackoffTime),
|
|
}
|
|
return
|
|
}
|
|
|
|
expTimeAdd := time.Second * time.Duration(bp.tries*bp.tries)
|
|
if expTimeAdd > maxBackoffTime {
|
|
expTimeAdd = maxBackoffTime
|
|
}
|
|
bp.until = time.Now().Add(baseBackoffTime + expTimeAdd)
|
|
bp.tries++
|
|
}
|
|
|
|
// Clear removes a backoff record. Clients should call this after a
|
|
// successful Dial.
|
|
func (db *dialbackoff) Clear(p peer.ID) {
|
|
db.lock.Lock()
|
|
defer db.lock.Unlock()
|
|
db.init()
|
|
delete(db.entries, p)
|
|
}
|
|
|
|
// Dial connects to a peer.
|
|
//
|
|
// The idea is that the client of Swarm does not need to know what network
|
|
// the connection will happen over. Swarm can use whichever it choses.
|
|
// This allows us to use various transport protocols, do NAT traversal/relay,
|
|
// etc. to achive connection.
|
|
func (s *Swarm) Dial(ctx context.Context, p peer.ID) (*Conn, error) {
|
|
var logdial = lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil)
|
|
if p == s.local {
|
|
log.Event(ctx, "swarmDialSelf", logdial)
|
|
return nil, ErrDialToSelf
|
|
}
|
|
|
|
return s.gatedDialAttempt(ctx, p)
|
|
}
|
|
|
|
func (s *Swarm) bestConnectionToPeer(p peer.ID) *Conn {
|
|
cs := s.ConnectionsToPeer(p)
|
|
for _, conn := range cs {
|
|
if conn != nil { // dump out the first one we find. (TODO pick better)
|
|
return conn
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// gatedDialAttempt is an attempt to dial a node. It is gated by the swarm's
|
|
// dial synchronization systems: dialsync and dialbackoff.
|
|
func (s *Swarm) gatedDialAttempt(ctx context.Context, p peer.ID) (*Conn, error) {
|
|
var logdial = lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil)
|
|
defer log.EventBegin(ctx, "swarmDialAttemptSync", logdial).Done()
|
|
|
|
// check if we already have an open connection first
|
|
conn := s.bestConnectionToPeer(p)
|
|
if conn != nil {
|
|
return conn, nil
|
|
}
|
|
|
|
// check if there's an ongoing dial to this peer
|
|
if ok, wait := s.dsync.Lock(p); ok {
|
|
defer s.dsync.Unlock(p)
|
|
|
|
// if this peer has been backed off, lets get out of here
|
|
if s.backf.Backoff(p) {
|
|
log.Event(ctx, "swarmDialBackoff", logdial)
|
|
return nil, ErrDialBackoff
|
|
}
|
|
|
|
// ok, we have been charged to dial! let's do it.
|
|
// if it succeeds, dial will add the conn to the swarm itself.
|
|
defer log.EventBegin(ctx, "swarmDialAttemptStart", logdial).Done()
|
|
ctxT, cancel := context.WithTimeout(ctx, s.dialT)
|
|
conn, err := s.dial(ctxT, p)
|
|
cancel()
|
|
log.Debugf("dial end %s", conn)
|
|
if err != nil {
|
|
log.Event(ctx, "swarmDialBackoffAdd", logdial)
|
|
s.backf.AddBackoff(p) // let others know to backoff
|
|
|
|
// ok, we failed. try again. (if loop is done, our error is output)
|
|
return nil, fmt.Errorf("dial attempt failed: %s", err)
|
|
}
|
|
log.Event(ctx, "swarmDialBackoffClear", logdial)
|
|
s.backf.Clear(p) // okay, no longer need to backoff
|
|
return conn, nil
|
|
|
|
} else {
|
|
// we did not dial. we must wait for someone else to dial.
|
|
|
|
// check whether we should backoff first...
|
|
if s.backf.Backoff(p) {
|
|
log.Event(ctx, "swarmDialBackoff", logdial)
|
|
return nil, ErrDialBackoff
|
|
}
|
|
|
|
defer log.EventBegin(ctx, "swarmDialWait", logdial).Done()
|
|
select {
|
|
case <-wait: // wait for that other dial to finish.
|
|
|
|
// see if it worked, OR we got an incoming dial in the meantime...
|
|
conn := s.bestConnectionToPeer(p)
|
|
if conn != nil {
|
|
return conn, nil
|
|
}
|
|
return nil, ErrDialFailed
|
|
case <-ctx.Done(): // or we may have to bail...
|
|
return nil, ctx.Err()
|
|
}
|
|
}
|
|
}
|
|
|
|
// dial is the actual swarm's dial logic, gated by Dial.
|
|
func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
|
|
var logdial = lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil)
|
|
if p == s.local {
|
|
log.Event(ctx, "swarmDialDoDialSelf", logdial)
|
|
return nil, ErrDialToSelf
|
|
}
|
|
defer log.EventBegin(ctx, "swarmDialDo", logdial).Done()
|
|
logdial["dial"] = "failure" // start off with failure. set to "success" at the end.
|
|
|
|
sk := s.peers.PrivKey(s.local)
|
|
logdial["encrypted"] = (sk != nil) // log wether this will be an encrypted dial or not.
|
|
if sk == nil {
|
|
// fine for sk to be nil, just log.
|
|
log.Debug("Dial not given PrivateKey, so WILL NOT SECURE conn.")
|
|
}
|
|
|
|
// get remote peer addrs
|
|
remoteAddrs := s.peers.Addrs(p)
|
|
// make sure we can use the addresses.
|
|
remoteAddrs = addrutil.FilterUsableAddrs(remoteAddrs)
|
|
// drop out any addrs that would just dial ourselves. use ListenAddresses
|
|
// as that is a more authoritative view than localAddrs.
|
|
ila, _ := s.InterfaceListenAddresses()
|
|
remoteAddrs = addrutil.Subtract(remoteAddrs, ila)
|
|
remoteAddrs = addrutil.Subtract(remoteAddrs, s.peers.Addrs(s.local))
|
|
|
|
log.Debugf("%s swarm dialing %s -- local:%s remote:%s", s.local, p, s.ListenAddresses(), remoteAddrs)
|
|
if len(remoteAddrs) == 0 {
|
|
err := errors.New("peer has no addresses")
|
|
logdial["error"] = err
|
|
return nil, err
|
|
}
|
|
|
|
remoteAddrs = s.filterAddrs(remoteAddrs)
|
|
if len(remoteAddrs) == 0 {
|
|
err := errors.New("all adresses for peer have been filtered out")
|
|
logdial["error"] = err
|
|
return nil, err
|
|
}
|
|
|
|
// try to get a connection to any addr
|
|
connC, err := s.dialAddrs(ctx, p, remoteAddrs)
|
|
if err != nil {
|
|
logdial["error"] = err
|
|
return nil, err
|
|
}
|
|
logdial["netconn"] = lgbl.NetConn(connC)
|
|
|
|
// ok try to setup the new connection.
|
|
defer log.EventBegin(ctx, "swarmDialDoSetup", logdial, lgbl.NetConn(connC)).Done()
|
|
swarmC, err := dialConnSetup(ctx, s, connC)
|
|
if err != nil {
|
|
logdial["error"] = err
|
|
connC.Close() // close the connection. didn't work out :(
|
|
return nil, err
|
|
}
|
|
|
|
logdial["dial"] = "success"
|
|
return swarmC, nil
|
|
}
|
|
|
|
func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs []ma.Multiaddr) (conn.Conn, error) {
|
|
|
|
// sort addresses so preferred addresses are dialed sooner
|
|
sort.Sort(AddrList(remoteAddrs))
|
|
|
|
// try to connect to one of the peer's known addresses.
|
|
// we dial concurrently to each of the addresses, which:
|
|
// * makes the process faster overall
|
|
// * attempts to get the fastest connection available.
|
|
// * mitigates the waste of trying bad addresses
|
|
log.Debugf("%s swarm dialing %s %s", s.local, p, remoteAddrs)
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel() // cancel work when we exit func
|
|
|
|
conns := make(chan conn.Conn)
|
|
errs := make(chan error, len(remoteAddrs))
|
|
|
|
// dialSingleAddr is used in the rate-limited async thing below.
|
|
dialSingleAddr := func(addr ma.Multiaddr) {
|
|
// rebind chans in scope so we can nil them out easily
|
|
connsout := conns
|
|
errsout := errs
|
|
|
|
connC, err := s.dialAddr(ctx, p, addr)
|
|
if err != nil {
|
|
connsout = nil
|
|
} else if connC == nil {
|
|
// NOTE: this really should never happen
|
|
log.Errorf("failed to dial %s %s and got no error!", p, addr)
|
|
err = fmt.Errorf("failed to dial %s %s", p, addr)
|
|
connsout = nil
|
|
} else {
|
|
errsout = nil
|
|
}
|
|
|
|
// check parent still wants our results
|
|
select {
|
|
case <-ctx.Done():
|
|
if connC != nil {
|
|
connC.Close()
|
|
}
|
|
case errsout <- err:
|
|
case connsout <- connC:
|
|
}
|
|
}
|
|
|
|
// this whole thing is in a goroutine so we can use foundConn
|
|
// to end early.
|
|
go func() {
|
|
limiter := make(chan struct{}, 8)
|
|
for _, addr := range remoteAddrs {
|
|
// returns whatever ratelimiting is acceptable for workerAddr.
|
|
// may not rate limit at all.
|
|
rl := s.addrDialRateLimit(addr)
|
|
select {
|
|
case <-ctx.Done(): // our context was cancelled
|
|
return
|
|
case rl <- struct{}{}:
|
|
// take the token, move on
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done(): // our context was cancelled
|
|
return
|
|
case limiter <- struct{}{}:
|
|
// take the token, move on
|
|
}
|
|
|
|
go func(rlc <-chan struct{}, a ma.Multiaddr) {
|
|
dialSingleAddr(a)
|
|
<-limiter
|
|
<-rlc
|
|
}(rl, addr)
|
|
}
|
|
}()
|
|
|
|
// wair for the results.
|
|
exitErr := fmt.Errorf("failed to dial %s", p)
|
|
for range remoteAddrs {
|
|
select {
|
|
case exitErr = <-errs: //
|
|
log.Debug("dial error: ", exitErr)
|
|
case connC := <-conns:
|
|
// take the first + return asap
|
|
return connC, nil
|
|
case <-ctx.Done():
|
|
// break out and return error
|
|
break
|
|
}
|
|
}
|
|
return nil, exitErr
|
|
}
|
|
|
|
func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (conn.Conn, error) {
|
|
log.Debugf("%s swarm dialing %s %s", s.local, p, addr)
|
|
|
|
connC, err := s.dialer.Dial(ctx, addr, p)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("%s --> %s dial attempt failed: %s", s.local, p, err)
|
|
}
|
|
|
|
// if the connection is not to whom we thought it would be...
|
|
remotep := connC.RemotePeer()
|
|
if remotep != p {
|
|
connC.Close()
|
|
return nil, fmt.Errorf("misdial to %s through %s (got %s)", p, addr, remotep)
|
|
}
|
|
|
|
// if the connection is to ourselves...
|
|
// this can happen TONS when Loopback addrs are advertized.
|
|
// (this should be caught by two checks above, but let's just make sure.)
|
|
if remotep == s.local {
|
|
connC.Close()
|
|
return nil, fmt.Errorf("misdial to %s through %s (got self)", p, addr)
|
|
}
|
|
|
|
// success! we got one!
|
|
return connC, nil
|
|
}
|
|
|
|
func (s *Swarm) filterAddrs(addrs []ma.Multiaddr) []ma.Multiaddr {
|
|
var out []ma.Multiaddr
|
|
for _, a := range addrs {
|
|
if !s.Filters.AddrBlocked(a) {
|
|
out = append(out, a)
|
|
}
|
|
}
|
|
return out
|
|
}
|
|
|
|
// dialConnSetup is the setup logic for a connection from the dial side. it
|
|
// needs to add the Conn to the StreamSwarm, then run newConnSetup
|
|
func dialConnSetup(ctx context.Context, s *Swarm, connC conn.Conn) (*Conn, error) {
|
|
|
|
psC, err := s.swarm.AddConn(connC)
|
|
if err != nil {
|
|
// connC is closed by caller if we fail.
|
|
return nil, fmt.Errorf("failed to add conn to ps.Swarm: %s", err)
|
|
}
|
|
|
|
// ok try to setup the new connection. (newConnSetup will add to group)
|
|
swarmC, err := s.newConnSetup(ctx, psC)
|
|
if err != nil {
|
|
psC.Close() // we need to make sure psC is Closed.
|
|
return nil, err
|
|
}
|
|
|
|
return swarmC, err
|
|
}
|
|
|
|
// addrDialRateLimit returns a ratelimiting channel for dialing transport
|
|
// addrs like a. for example, tcp is fd-ratelimited. utp is not ratelimited.
|
|
func (s *Swarm) addrDialRateLimit(a ma.Multiaddr) chan struct{} {
|
|
if isFDCostlyTransport(a) {
|
|
return s.fdRateLimit
|
|
}
|
|
|
|
// do not rate limit it at all
|
|
return make(chan struct{}, 1)
|
|
}
|
|
|
|
func isFDCostlyTransport(a ma.Multiaddr) bool {
|
|
return isTCPMultiaddr(a)
|
|
}
|
|
|
|
func isTCPMultiaddr(a ma.Multiaddr) bool {
|
|
p := a.Protocols()
|
|
return len(p) == 2 && (p[0].Name == "ip4" || p[0].Name == "ip6") && p[1].Name == "tcp"
|
|
}
|
|
|
|
type AddrList []ma.Multiaddr
|
|
|
|
func (al AddrList) Len() int {
|
|
return len(al)
|
|
}
|
|
|
|
func (al AddrList) Swap(i, j int) {
|
|
al[i], al[j] = al[j], al[i]
|
|
}
|
|
|
|
func (al AddrList) Less(i, j int) bool {
|
|
a := al[i]
|
|
b := al[j]
|
|
|
|
// dial localhost addresses next, they should fail immediately
|
|
lba := manet.IsIPLoopback(a)
|
|
lbb := manet.IsIPLoopback(b)
|
|
if lba {
|
|
if !lbb {
|
|
return true
|
|
}
|
|
}
|
|
|
|
// dial utp and similar 'non-fd-consuming' addresses first
|
|
fda := isFDCostlyTransport(a)
|
|
fdb := isFDCostlyTransport(b)
|
|
if !fda {
|
|
if fdb {
|
|
return true
|
|
}
|
|
|
|
// if neither consume fd's, assume equal ordering
|
|
return false
|
|
}
|
|
|
|
// if 'b' doesnt take a file descriptor
|
|
if !fdb {
|
|
return false
|
|
}
|
|
|
|
// if 'b' is loopback and both take file descriptors
|
|
if lbb {
|
|
return false
|
|
}
|
|
|
|
// for the rest, just sort by bytes
|
|
return bytes.Compare(a.Bytes(), b.Bytes()) > 0
|
|
}
|