diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index bce9e580e..8b0a0fd98 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -90,6 +90,8 @@ type BasicHost struct { evtLocalProtocolsUpdated event.Emitter evtLocalAddrsUpdated event.Emitter } + + addrChangeChan chan struct{} } var _ host.Host = (*BasicHost)(nil) @@ -130,12 +132,13 @@ type HostOpts struct { // NewHost constructs a new *BasicHost and activates it by attaching its stream and connection handlers to the given inet.Network. func NewHost(ctx context.Context, net network.Network, opts *HostOpts) (*BasicHost, error) { h := &BasicHost{ - network: net, - mux: msmux.NewMultistreamMuxer(), - negtimeout: DefaultNegotiationTimeout, - AddrsFactory: DefaultAddrsFactory, - maResolver: madns.DefaultResolver, - eventbus: eventbus.NewBus(), + network: net, + mux: msmux.NewMultistreamMuxer(), + negtimeout: DefaultNegotiationTimeout, + AddrsFactory: DefaultAddrsFactory, + maResolver: madns.DefaultResolver, + eventbus: eventbus.NewBus(), + addrChangeChan: make(chan struct{}, 1), } var err error @@ -230,6 +233,7 @@ func New(net network.Network, opts ...interface{}) *BasicHost { } h, err := NewHost(context.Background(), net, hostopts) + h.Start() if err != nil { // this cannot happen with legacy options // plus we want to keep the (deprecated) legacy interface unchanged @@ -300,24 +304,13 @@ func (h *BasicHost) newStreamHandler(s network.Stream) { go handle(protoID, s) } -// CheckForAddressChanges determines whether our listen addresses have recently -// changed and emits an EvtLocalAddressesUpdatedEvent & a Push Identify if so. +// SignalAddressChange signals to the host that it needs to determine whether our listen addresses have recently +// changed. // Warning: this interface is unstable and may disappear in the future. -func (h *BasicHost) CheckForAddressChanges() { - h.mx.Lock() - addrs := h.Addrs() - changeEvt := makeUpdatedAddrEvent(h.lastAddrs, addrs) - if changeEvt != nil { - h.lastAddrs = addrs - } - h.mx.Unlock() - - if changeEvt != nil { - err := h.emitters.evtLocalAddrsUpdated.Emit(*changeEvt) - if err != nil { - log.Warnf("error emitting event for updated addrs: %s", err) - } - h.ids.Push() +func (h *BasicHost) SignalAddressChange() { + select { + case h.addrChangeChan <- struct{}{}: + default: } } @@ -366,10 +359,36 @@ func (h *BasicHost) background(p goprocess.Process) { } h.mx.Unlock() + // emit an EvtLocalAddressesUpdatedEvent & a Push Identify if our listen addresses have changed. + go func() { + for { + select { + case <-h.addrChangeChan: + h.mx.Lock() + addrs := h.Addrs() + changeEvt := makeUpdatedAddrEvent(h.lastAddrs, addrs) + if changeEvt != nil { + h.lastAddrs = addrs + } + h.mx.Unlock() + + if changeEvt != nil { + err := h.emitters.evtLocalAddrsUpdated.Emit(*changeEvt) + if err != nil { + log.Warnf("error emitting event for updated addrs: %s", err) + } + h.ids.Push() + } + case <-p.Closing(): + return + } + } + }() + for { select { case <-ticker.C: - h.CheckForAddressChanges() + h.SignalAddressChange() case <-p.Closing(): return diff --git a/p2p/host/basic/basic_host_test.go b/p2p/host/basic/basic_host_test.go index dd3593a89..c0dee8e57 100644 --- a/p2p/host/basic/basic_host_test.go +++ b/p2p/host/basic/basic_host_test.go @@ -6,6 +6,7 @@ import ( "io" "reflect" "sort" + "sync" "testing" "time" @@ -518,7 +519,7 @@ func TestHostAddrChangeDetection(t *testing.T) { {ma.StringCast("/ip4/2.3.4.5/tcp/1234"), ma.StringCast("/ip4/3.4.5.6/tcp/4321")}, } - // The events we expect the host to emit when CheckForAddressChanges is called + // The events we expect the host to emit when SignalAddressChange is called // and the changes between addr sets are detected expectedEvents := []event.EvtLocalAddressesUpdated{ { @@ -548,8 +549,11 @@ func TestHostAddrChangeDetection(t *testing.T) { }, } + var lk sync.Mutex currentAddrSet := 0 addrsFactory := func(addrs []ma.Multiaddr) []ma.Multiaddr { + lk.Lock() + defer lk.Unlock() return addrSets[currentAddrSet] } @@ -563,46 +567,40 @@ func TestHostAddrChangeDetection(t *testing.T) { } defer sub.Close() + // wait for the host background thread to start + time.Sleep(1 * time.Second) // host should start with no addrs (addrSet 0) addrs := h.Addrs() if len(addrs) != 0 { t.Fatalf("expected 0 addrs, got %d", len(addrs)) } - // Advance between addrSets + // change addr, signal and assert event for i := 1; i < len(addrSets); i++ { + lk.Lock() currentAddrSet = i - h.CheckForAddressChanges() // forces the host to check for changes now, instead of waiting for background update - } + lk.Unlock() + h.SignalAddressChange() + evt := waitForAddrChangeEvent(ctx, sub, t) + if !updatedAddrEventsEqual(expectedEvents[i-1], evt) { + t.Errorf("change events not equal: \n\texpected: %v \n\tactual: %v", expectedEvents[i], evt) + } - // drain events from the subscription - var receivedEvents []event.EvtLocalAddressesUpdated -readEvents: + } +} + +func waitForAddrChangeEvent(ctx context.Context, sub event.Subscription, t *testing.T) event.EvtLocalAddressesUpdated { for { select { case evt, more := <-sub.Out(): if !more { - break readEvents - } - receivedEvents = append(receivedEvents, evt.(event.EvtLocalAddressesUpdated)) - if len(receivedEvents) == len(expectedEvents) { - break readEvents + t.Fatal("channel should not be closed") } + return evt.(event.EvtLocalAddressesUpdated) case <-ctx.Done(): - break readEvents - case <-time.After(1 * time.Second): - break readEvents - } - } - - // assert that we received the events we expected - if len(receivedEvents) != len(expectedEvents) { - t.Errorf("expected to receive %d addr change events, got %d", len(expectedEvents), len(receivedEvents)) - } - for i, expected := range expectedEvents { - actual := receivedEvents[i] - if !updatedAddrEventsEqual(expected, actual) { - t.Errorf("change events not equal: \n\texpected: %v \n\tactual: %v", expected, actual) + t.Fatal("context should not have cancelled") + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for address change event") } } } diff --git a/p2p/host/relay/autorelay.go b/p2p/host/relay/autorelay.go index 3ef741b08..a1691efb2 100644 --- a/p2p/host/relay/autorelay.go +++ b/p2p/host/relay/autorelay.go @@ -119,7 +119,7 @@ func (ar *AutoRelay) background(ctx context.Context) { ar.cachedAddrs = nil ar.mx.Unlock() push = false - ar.host.CheckForAddressChanges() + ar.host.SignalAddressChange() } } }