mirror of
https://github.com/libp2p/go-libp2p.git
synced 2025-10-24 00:23:21 +08:00
make new stream calls accept a context
This commit is contained in:
@@ -162,8 +162,8 @@ func (h *BasicHost) RemoveStreamHandler(pid protocol.ID) {
|
|||||||
// header with given protocol.ID. If there is no connection to p, attempts
|
// header with given protocol.ID. If there is no connection to p, attempts
|
||||||
// to create one. If ProtocolID is "", writes no header.
|
// to create one. If ProtocolID is "", writes no header.
|
||||||
// (Threadsafe)
|
// (Threadsafe)
|
||||||
func (h *BasicHost) NewStream(pid protocol.ID, p peer.ID) (inet.Stream, error) {
|
func (h *BasicHost) NewStream(ctx context.Context, pid protocol.ID, p peer.ID) (inet.Stream, error) {
|
||||||
s, err := h.Network().NewStream(p)
|
s, err := h.Network().NewStream(ctx, p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@@ -32,7 +32,7 @@ func TestHostSimple(t *testing.T) {
|
|||||||
io.Copy(w, s) // mirror everything
|
io.Copy(w, s) // mirror everything
|
||||||
})
|
})
|
||||||
|
|
||||||
s, err := h1.NewStream(protocol.TestingID, h2pi.ID)
|
s, err := h1.NewStream(ctx, protocol.TestingID, h2pi.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@@ -56,7 +56,7 @@ type Host interface {
|
|||||||
// header with given protocol.ID. If there is no connection to p, attempts
|
// header with given protocol.ID. If there is no connection to p, attempts
|
||||||
// to create one. If ProtocolID is "", writes no header.
|
// to create one. If ProtocolID is "", writes no header.
|
||||||
// (Threadsafe)
|
// (Threadsafe)
|
||||||
NewStream(pid protocol.ID, p peer.ID) (inet.Stream, error)
|
NewStream(ctx context.Context, pid protocol.ID, p peer.ID) (inet.Stream, error)
|
||||||
|
|
||||||
// Close shuts down the host, its Network, and services.
|
// Close shuts down the host, its Network, and services.
|
||||||
Close() error
|
Close() error
|
||||||
|
@@ -114,8 +114,8 @@ func (rh *RoutedHost) RemoveStreamHandler(pid protocol.ID) {
|
|||||||
rh.host.RemoveStreamHandler(pid)
|
rh.host.RemoveStreamHandler(pid)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rh *RoutedHost) NewStream(pid protocol.ID, p peer.ID) (inet.Stream, error) {
|
func (rh *RoutedHost) NewStream(ctx context.Context, pid protocol.ID, p peer.ID) (inet.Stream, error) {
|
||||||
return rh.host.NewStream(pid, p)
|
return rh.host.NewStream(ctx, pid, p)
|
||||||
}
|
}
|
||||||
func (rh *RoutedHost) Close() error {
|
func (rh *RoutedHost) Close() error {
|
||||||
// no need to close IpfsRouting. we dont own it.
|
// no need to close IpfsRouting. we dont own it.
|
||||||
|
@@ -67,7 +67,7 @@ type Network interface {
|
|||||||
|
|
||||||
// NewStream returns a new stream to given peer p.
|
// NewStream returns a new stream to given peer p.
|
||||||
// If there is no connection to p, attempts to create one.
|
// If there is no connection to p, attempts to create one.
|
||||||
NewStream(peer.ID) (Stream, error)
|
NewStream(context.Context, peer.ID) (Stream, error)
|
||||||
|
|
||||||
// Listen tells the network to start listening on given multiaddrs.
|
// Listen tells the network to start listening on given multiaddrs.
|
||||||
Listen(...ma.Multiaddr) error
|
Listen(...ma.Multiaddr) error
|
||||||
|
@@ -325,7 +325,7 @@ func (pn *peernet) Connectedness(p peer.ID) inet.Connectedness {
|
|||||||
|
|
||||||
// NewStream returns a new stream to given peer p.
|
// NewStream returns a new stream to given peer p.
|
||||||
// If there is no connection to p, attempts to create one.
|
// If there is no connection to p, attempts to create one.
|
||||||
func (pn *peernet) NewStream(p peer.ID) (inet.Stream, error) {
|
func (pn *peernet) NewStream(ctx context.Context, p peer.ID) (inet.Stream, error) {
|
||||||
pn.Lock()
|
pn.Lock()
|
||||||
cs, found := pn.connsByPeer[p]
|
cs, found := pn.connsByPeer[p]
|
||||||
if !found || len(cs) < 1 {
|
if !found || len(cs) < 1 {
|
||||||
|
@@ -14,8 +14,8 @@ import (
|
|||||||
protocol "github.com/ipfs/go-libp2p/p2p/protocol"
|
protocol "github.com/ipfs/go-libp2p/p2p/protocol"
|
||||||
testutil "github.com/ipfs/go-libp2p/testutil"
|
testutil "github.com/ipfs/go-libp2p/testutil"
|
||||||
|
|
||||||
context "gx/QmacZi9WygGK7Me8mH53pypyscHzU386aUZXpr28GZgUct/context"
|
|
||||||
detectrace "github.com/jbenet/go-detect-race"
|
detectrace "github.com/jbenet/go-detect-race"
|
||||||
|
context "gx/QmacZi9WygGK7Me8mH53pypyscHzU386aUZXpr28GZgUct/context"
|
||||||
)
|
)
|
||||||
|
|
||||||
func randPeer(t *testing.T) peer.ID {
|
func randPeer(t *testing.T) peer.ID {
|
||||||
@@ -208,21 +208,21 @@ func TestNetworkSetup(t *testing.T) {
|
|||||||
// p.NetworkConns(n3)
|
// p.NetworkConns(n3)
|
||||||
|
|
||||||
// can create a stream 2->3, 3->2,
|
// can create a stream 2->3, 3->2,
|
||||||
if _, err := n2.NewStream(p3); err != nil {
|
if _, err := n2.NewStream(ctx, p3); err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
if _, err := n3.NewStream(p2); err != nil {
|
if _, err := n3.NewStream(ctx, p2); err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// but not 1->2 nor 2->2 (not linked), nor 1->1 (not connected)
|
// but not 1->2 nor 2->2 (not linked), nor 1->1 (not connected)
|
||||||
if _, err := n1.NewStream(p2); err == nil {
|
if _, err := n1.NewStream(ctx, p2); err == nil {
|
||||||
t.Error("should not be able to connect")
|
t.Error("should not be able to connect")
|
||||||
}
|
}
|
||||||
if _, err := n2.NewStream(p2); err == nil {
|
if _, err := n2.NewStream(ctx, p2); err == nil {
|
||||||
t.Error("should not be able to connect")
|
t.Error("should not be able to connect")
|
||||||
}
|
}
|
||||||
if _, err := n1.NewStream(p1); err == nil {
|
if _, err := n1.NewStream(ctx, p1); err == nil {
|
||||||
t.Error("should not be able to connect")
|
t.Error("should not be able to connect")
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -232,7 +232,7 @@ func TestNetworkSetup(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// and a stream too
|
// and a stream too
|
||||||
if _, err := n1.NewStream(p1); err != nil {
|
if _, err := n1.NewStream(ctx, p1); err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -265,13 +265,14 @@ func TestNetworkSetup(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// and a stream should work now too :)
|
// and a stream should work now too :)
|
||||||
if _, err := n2.NewStream(p3); err != nil {
|
if _, err := n2.NewStream(ctx, p3); err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestStreams(t *testing.T) {
|
func TestStreams(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
mn, err := FullMeshConnected(context.Background(), 3)
|
mn, err := FullMeshConnected(context.Background(), 3)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -297,7 +298,7 @@ func TestStreams(t *testing.T) {
|
|||||||
h.SetStreamHandler(protocol.TestingID, handler)
|
h.SetStreamHandler(protocol.TestingID, handler)
|
||||||
}
|
}
|
||||||
|
|
||||||
s, err := hosts[0].NewStream(protocol.TestingID, hosts[1].ID())
|
s, err := hosts[0].NewStream(ctx, protocol.TestingID, hosts[1].ID())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@@ -361,6 +362,7 @@ func makePonger(st string) func(inet.Stream) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestStreamsStress(t *testing.T) {
|
func TestStreamsStress(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
nnodes := 100
|
nnodes := 100
|
||||||
if detectrace.WithRace() {
|
if detectrace.WithRace() {
|
||||||
nnodes = 50
|
nnodes = 50
|
||||||
@@ -384,7 +386,7 @@ func TestStreamsStress(t *testing.T) {
|
|||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
from := rand.Intn(len(hosts))
|
from := rand.Intn(len(hosts))
|
||||||
to := rand.Intn(len(hosts))
|
to := rand.Intn(len(hosts))
|
||||||
s, err := hosts[from].NewStream(protocol.TestingID, hosts[to].ID())
|
s, err := hosts[from].NewStream(ctx, protocol.TestingID, hosts[to].ID())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Debugf("%d (%s) %d (%s)", from, hosts[from], to, hosts[to])
|
log.Debugf("%d (%s) %d (%s)", from, hosts[from], to, hosts[to])
|
||||||
panic(err)
|
panic(err)
|
||||||
@@ -463,7 +465,8 @@ func TestAdding(t *testing.T) {
|
|||||||
t.Fatalf("no network for %s", p1)
|
t.Fatalf("no network for %s", p1)
|
||||||
}
|
}
|
||||||
|
|
||||||
s, err := h1.NewStream(protocol.TestingID, p2)
|
ctx := context.Background()
|
||||||
|
s, err := h1.NewStream(ctx, protocol.TestingID, p2)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@@ -559,7 +562,8 @@ func TestLimitedStreams(t *testing.T) {
|
|||||||
link.SetOptions(opts)
|
link.SetOptions(opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
s, err := hosts[0].NewStream(protocol.TestingID, hosts[1].ID())
|
ctx := context.Background()
|
||||||
|
s, err := hosts[0].NewStream(ctx, protocol.TestingID, hosts[1].ID())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@@ -192,16 +192,17 @@ func (s *Swarm) SetStreamHandler(handler inet.StreamHandler) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewStreamWithPeer creates a new stream on any available connection to p
|
// NewStreamWithPeer creates a new stream on any available connection to p
|
||||||
func (s *Swarm) NewStreamWithPeer(p peer.ID) (*Stream, error) {
|
func (s *Swarm) NewStreamWithPeer(ctx context.Context, p peer.ID) (*Stream, error) {
|
||||||
// if we have no connections, try connecting.
|
// if we have no connections, try connecting.
|
||||||
if len(s.ConnectionsToPeer(p)) == 0 {
|
if len(s.ConnectionsToPeer(p)) == 0 {
|
||||||
log.Debug("Swarm: NewStreamWithPeer no connections. Attempting to connect...")
|
log.Debug("Swarm: NewStreamWithPeer no connections. Attempting to connect...")
|
||||||
if _, err := s.Dial(s.Context(), p); err != nil {
|
if _, err := s.Dial(ctx, p); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.Debug("Swarm: NewStreamWithPeer...")
|
log.Debug("Swarm: NewStreamWithPeer...")
|
||||||
|
|
||||||
|
// TODO: think about passing a context down to NewStreamWithGroup
|
||||||
st, err := s.swarm.NewStreamWithGroup(p)
|
st, err := s.swarm.NewStreamWithGroup(p)
|
||||||
return wrapStream(st), err
|
return wrapStream(st), err
|
||||||
}
|
}
|
||||||
|
@@ -132,9 +132,9 @@ func (n *Network) Connectedness(p peer.ID) inet.Connectedness {
|
|||||||
|
|
||||||
// NewStream returns a new stream to given peer p.
|
// NewStream returns a new stream to given peer p.
|
||||||
// If there is no connection to p, attempts to create one.
|
// If there is no connection to p, attempts to create one.
|
||||||
func (n *Network) NewStream(p peer.ID) (inet.Stream, error) {
|
func (n *Network) NewStream(ctx context.Context, p peer.ID) (inet.Stream, error) {
|
||||||
log.Debugf("[%s] network opening stream to peer [%s]", n.local, p)
|
log.Debugf("[%s] network opening stream to peer [%s]", n.local, p)
|
||||||
s, err := n.Swarm().NewStreamWithPeer(p)
|
s, err := n.Swarm().NewStreamWithPeer(ctx, p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@@ -129,7 +129,7 @@ func SubtestSwarm(t *testing.T, SwarmNum int, MsgNum int) {
|
|||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
// first, one stream per peer (nice)
|
// first, one stream per peer (nice)
|
||||||
stream, err := s1.NewStreamWithPeer(p)
|
stream, err := s1.NewStreamWithPeer(ctx, p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errChan <- err
|
errChan <- err
|
||||||
return
|
return
|
||||||
|
@@ -50,7 +50,7 @@ func (p *PingService) PingHandler(s inet.Stream) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (ps *PingService) Ping(ctx context.Context, p peer.ID) (<-chan time.Duration, error) {
|
func (ps *PingService) Ping(ctx context.Context, p peer.ID) (<-chan time.Duration, error) {
|
||||||
s, err := ps.Host.NewStream(ID, p)
|
s, err := ps.Host.NewStream(ctx, ID, p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@@ -3,6 +3,7 @@ package relay
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"time"
|
||||||
|
|
||||||
mh "gx/Qma7dqy7ZVH4tkNJdC9TRrA82Uz5fQfbbwuvmNVVc17r7a/go-multihash"
|
mh "gx/Qma7dqy7ZVH4tkNJdC9TRrA82Uz5fQfbbwuvmNVVc17r7a/go-multihash"
|
||||||
|
|
||||||
@@ -10,6 +11,7 @@ import (
|
|||||||
inet "github.com/ipfs/go-libp2p/p2p/net"
|
inet "github.com/ipfs/go-libp2p/p2p/net"
|
||||||
peer "github.com/ipfs/go-libp2p/p2p/peer"
|
peer "github.com/ipfs/go-libp2p/p2p/peer"
|
||||||
protocol "github.com/ipfs/go-libp2p/p2p/protocol"
|
protocol "github.com/ipfs/go-libp2p/p2p/protocol"
|
||||||
|
context "gx/QmacZi9WygGK7Me8mH53pypyscHzU386aUZXpr28GZgUct/context"
|
||||||
logging "gx/QmfZZB1aVXWA4kaR5R4e9NifERT366TTCSagkfhmAbYLsu/go-log"
|
logging "gx/QmfZZB1aVXWA4kaR5R4e9NifERT366TTCSagkfhmAbYLsu/go-log"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -83,10 +85,15 @@ func (rs *RelayService) consumeStream(s inet.Stream) error {
|
|||||||
|
|
||||||
// pipeStream relays over a stream to a remote peer. It's like `cat`
|
// pipeStream relays over a stream to a remote peer. It's like `cat`
|
||||||
func (rs *RelayService) pipeStream(src, dst peer.ID, s inet.Stream) error {
|
func (rs *RelayService) pipeStream(src, dst peer.ID, s inet.Stream) error {
|
||||||
s2, err := rs.openStreamToPeer(dst)
|
// TODO: find a good way to pass contexts into here
|
||||||
|
nsctx, cancel := context.WithTimeout(context.TODO(), time.Second*30)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
s2, err := rs.openStreamToPeer(nsctx, dst)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to open stream to peer: %s -- %s", dst, err)
|
return fmt.Errorf("failed to open stream to peer: %s -- %s", dst, err)
|
||||||
}
|
}
|
||||||
|
cancel() // cancel here because this function might last a while
|
||||||
|
|
||||||
if err := WriteHeader(s2, src, dst); err != nil {
|
if err := WriteHeader(s2, src, dst); err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -116,8 +123,8 @@ func (rs *RelayService) pipeStream(src, dst peer.ID, s inet.Stream) error {
|
|||||||
// openStreamToPeer opens a pipe to a remote endpoint
|
// openStreamToPeer opens a pipe to a remote endpoint
|
||||||
// for now, can only open streams to directly connected peers.
|
// for now, can only open streams to directly connected peers.
|
||||||
// maybe we can do some routing later on.
|
// maybe we can do some routing later on.
|
||||||
func (rs *RelayService) openStreamToPeer(p peer.ID) (inet.Stream, error) {
|
func (rs *RelayService) openStreamToPeer(ctx context.Context, p peer.ID) (inet.Stream, error) {
|
||||||
return rs.host.NewStream(ID, p)
|
return rs.host.NewStream(ctx, ID, p)
|
||||||
}
|
}
|
||||||
|
|
||||||
func ReadHeader(r io.Reader) (src, dst peer.ID, err error) {
|
func ReadHeader(r io.Reader) (src, dst peer.ID, err error) {
|
||||||
|
@@ -50,7 +50,7 @@ func TestRelaySimple(t *testing.T) {
|
|||||||
|
|
||||||
// ok, now we can try to relay n1--->n2--->n3.
|
// ok, now we can try to relay n1--->n2--->n3.
|
||||||
log.Debug("open relay stream")
|
log.Debug("open relay stream")
|
||||||
s, err := n1.NewStream(relay.ID, n2p)
|
s, err := n1.NewStream(ctx, relay.ID, n2p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@@ -145,7 +145,7 @@ func TestRelayAcrossFour(t *testing.T) {
|
|||||||
|
|
||||||
// ok, now we can try to relay n1--->n2--->n3--->n4--->n5
|
// ok, now we can try to relay n1--->n2--->n3--->n4--->n5
|
||||||
log.Debug("open relay stream")
|
log.Debug("open relay stream")
|
||||||
s, err := n1.NewStream(relay.ID, n2p)
|
s, err := n1.NewStream(ctx, relay.ID, n2p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@@ -245,7 +245,7 @@ func TestRelayStress(t *testing.T) {
|
|||||||
|
|
||||||
// ok, now we can try to relay n1--->n2--->n3.
|
// ok, now we can try to relay n1--->n2--->n3.
|
||||||
log.Debug("open relay stream")
|
log.Debug("open relay stream")
|
||||||
s, err := n1.NewStream(relay.ID, n2p)
|
s, err := n1.NewStream(ctx, relay.ID, n2p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@@ -84,7 +84,7 @@ a problem.
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
s, err = host.NewStream(protocol.TestingID, remote)
|
s, err = host.NewStream(context.Background(), protocol.TestingID, remote)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -286,7 +286,7 @@ func TestStBackpressureStreamWrite(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// open a stream, from 2->1, this is our reader
|
// open a stream, from 2->1, this is our reader
|
||||||
s, err := h2.NewStream(protocol.TestingID, h1.ID())
|
s, err := h2.NewStream(context.Background(), protocol.TestingID, h1.ID())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@@ -178,7 +178,7 @@ func SubtestConnSendDisc(t *testing.T, hosts []host.Host) {
|
|||||||
for i := 0; i < numStreams; i++ {
|
for i := 0; i < numStreams; i++ {
|
||||||
h1 := hosts[i%len(hosts)]
|
h1 := hosts[i%len(hosts)]
|
||||||
h2 := hosts[(i+1)%len(hosts)]
|
h2 := hosts[(i+1)%len(hosts)]
|
||||||
s, err := h1.NewStream(protocol.TestingID, h2.ID())
|
s, err := h1.NewStream(context.Background(), protocol.TestingID, h2.ID())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user