fix direct connector only select one listener (#875)
Some checks failed
EasyTier Core / pre_job (push) Has been cancelled
EasyTier Core / build_web (push) Has been cancelled
EasyTier Core / build (freebsd-13.2-x86_64, 13.2, ubuntu-22.04, x86_64-unknown-freebsd) (push) Has been cancelled
EasyTier Core / build (linux-aarch64, ubuntu-22.04, aarch64-unknown-linux-musl) (push) Has been cancelled
EasyTier Core / build (linux-arm, ubuntu-22.04, arm-unknown-linux-musleabi) (push) Has been cancelled
EasyTier Core / build (linux-armhf, ubuntu-22.04, arm-unknown-linux-musleabihf) (push) Has been cancelled
EasyTier Core / build (linux-armv7, ubuntu-22.04, armv7-unknown-linux-musleabi) (push) Has been cancelled
EasyTier Core / build (linux-armv7hf, ubuntu-22.04, armv7-unknown-linux-musleabihf) (push) Has been cancelled
EasyTier Core / build (linux-mips, ubuntu-22.04, mips-unknown-linux-musl) (push) Has been cancelled
EasyTier Core / build (linux-mipsel, ubuntu-22.04, mipsel-unknown-linux-musl) (push) Has been cancelled
EasyTier Core / build (linux-x86_64, ubuntu-22.04, x86_64-unknown-linux-musl) (push) Has been cancelled
EasyTier Core / build (macos-aarch64, macos-latest, aarch64-apple-darwin) (push) Has been cancelled
EasyTier Core / build (macos-x86_64, macos-latest, x86_64-apple-darwin) (push) Has been cancelled
EasyTier Core / build (windows-arm64, windows-latest, aarch64-pc-windows-msvc) (push) Has been cancelled
EasyTier Core / build (windows-i686, windows-latest, i686-pc-windows-msvc) (push) Has been cancelled
EasyTier Core / build (windows-x86_64, windows-latest, x86_64-pc-windows-msvc) (push) Has been cancelled
EasyTier Core / core-result (push) Has been cancelled
EasyTier Core / magisk_build (push) Has been cancelled
EasyTier GUI / pre_job (push) Has been cancelled
EasyTier GUI / build-gui (linux-aarch64, aarch64-unknown-linux-gnu, ubuntu-22.04, aarch64-unknown-linux-musl) (push) Has been cancelled
EasyTier GUI / build-gui (linux-x86_64, x86_64-unknown-linux-gnu, ubuntu-22.04, x86_64-unknown-linux-musl) (push) Has been cancelled
EasyTier GUI / build-gui (macos-aarch64, aarch64-apple-darwin, macos-latest, aarch64-apple-darwin) (push) Has been cancelled
EasyTier GUI / build-gui (macos-x86_64, x86_64-apple-darwin, macos-latest, x86_64-apple-darwin) (push) Has been cancelled
EasyTier GUI / build-gui (windows-arm64, aarch64-pc-windows-msvc, windows-latest, aarch64-pc-windows-msvc) (push) Has been cancelled
EasyTier GUI / build-gui (windows-i686, i686-pc-windows-msvc, windows-latest, i686-pc-windows-msvc) (push) Has been cancelled
EasyTier GUI / build-gui (windows-x86_64, x86_64-pc-windows-msvc, windows-latest, x86_64-pc-windows-msvc) (push) Has been cancelled
EasyTier GUI / gui-result (push) Has been cancelled
EasyTier Mobile / pre_job (push) Has been cancelled
EasyTier Mobile / build-mobile (android, ubuntu-22.04, android) (push) Has been cancelled
EasyTier Mobile / mobile-result (push) Has been cancelled
EasyTier Test / pre_job (push) Has been cancelled
EasyTier Test / test (push) Has been cancelled

This commit is contained in:
Sijie.Sun
2025-05-25 13:56:08 +08:00
committed by GitHub
parent 29994b663a
commit b0fd37949a
3 changed files with 99 additions and 58 deletions

10
Cargo.lock generated
View File

@@ -820,15 +820,6 @@ dependencies = [
"syn 2.0.87",
]
[[package]]
name = "bounded_join_set"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae18fd8f4a623bcf416b5bc8f1e0905534d9911597ed17cc57ab9b6eed65454d"
dependencies = [
"tokio",
]
[[package]]
name = "brotli"
version = "7.0.0"
@@ -1941,7 +1932,6 @@ dependencies = [
"base64 0.22.1",
"bitflags 2.8.0",
"boringtun-easytier",
"bounded_join_set",
"bytecodec",
"byteorder",
"bytes",

View File

@@ -213,8 +213,6 @@ humantime-serde = "1.1.1"
multimap = "0.10.0"
version-compare = "0.2.0"
bounded_join_set = "0.3.0"
jemallocator = { version = "0.5.4", optional = true }
jemalloc-ctl = { version = "0.5.4", optional = true }
jemalloc-sys = { version = "0.5.4", features = [

View File

@@ -221,16 +221,18 @@ impl DirectConnectorManagerData {
Ok(())
}
#[tracing::instrument]
#[tracing::instrument(skip(self))]
async fn try_connect_to_ip(
self: Arc<DirectConnectorManagerData>,
dst_peer_id: PeerId,
addr: String,
) -> Result<(), Error> {
let mut rand_gen = rand::rngs::OsRng::default();
let backoff_ms = vec![1000, 2000];
let backoff_ms = vec![1000, 2000, 4000];
let mut backoff_idx = 0;
tracing::debug!(?dst_peer_id, ?addr, "try_connect_to_ip start");
self.dst_listener_blacklist.cleanup();
if self
@@ -244,12 +246,21 @@ impl DirectConnectorManagerData {
}
loop {
if self.peer_manager.has_directly_connected_conn(dst_peer_id) {
return Ok(());
}
tracing::debug!(?dst_peer_id, ?addr, "try_connect_to_ip start one round");
let ret = self.do_try_connect_to_ip(dst_peer_id, addr.clone()).await;
tracing::debug!(?ret, ?dst_peer_id, ?addr, "try_connect_to_ip return");
if ret.is_ok() {
return Ok(());
}
if self.peer_manager.has_directly_connected_conn(dst_peer_id) {
return Ok(());
}
if backoff_idx < backoff_ms.len() {
let delta = backoff_ms[backoff_idx] >> 1;
assert!(delta > 0);
@@ -273,37 +284,19 @@ impl DirectConnectorManagerData {
}
}
#[tracing::instrument]
async fn do_try_direct_connect_internal(
fn spawn_direct_connect_task(
self: &Arc<DirectConnectorManagerData>,
dst_peer_id: PeerId,
ip_list: GetIpListResponse,
) -> Result<(), Error> {
let enable_ipv6 = self.global_ctx.get_flags().enable_ipv6;
let available_listeners = ip_list
.listeners
.into_iter()
.map(Into::<url::Url>::into)
.filter_map(|l| if l.scheme() != "ring" { Some(l) } else { None })
.filter(|l| l.port().is_some() && l.host().is_some())
.filter(|l| enable_ipv6 || !matches!(l.host().unwrap().to_owned(), Host::Ipv6(_)))
.collect::<Vec<_>>();
tracing::debug!(?available_listeners, "got available listeners");
if available_listeners.is_empty() {
return Err(anyhow::anyhow!("peer {} have no valid listener", dst_peer_id).into());
}
// if have default listener, use it first
let listener = available_listeners
.iter()
.find(|l| l.scheme() == self.global_ctx.get_flags().default_protocol)
.unwrap_or(available_listeners.get(0).unwrap());
let mut tasks = bounded_join_set::JoinSet::new(2);
let listener_host = listener.socket_addrs(|| None)?.pop();
ip_list: &GetIpListResponse,
listener: &url::Url,
tasks: &mut JoinSet<Result<(), Error>>,
) {
let Ok(mut addrs) = listener.socket_addrs(|| None) else {
tracing::error!(?listener, "failed to parse socket address from listener");
return;
};
let listener_host = addrs.pop();
tracing::info!(?listener_host, ?listener, "try direct connect to peer");
match listener_host {
Some(SocketAddr::V4(s_addr)) => {
if s_addr.ip().is_unspecified() {
@@ -386,30 +379,90 @@ impl DirectConnectorManagerData {
tracing::error!(?p, ?listener, "failed to parse ip version from listener");
}
}
}
while let Some(ret) = tasks.join_next().await {
match ret {
Ok(Ok(_)) => {
tracing::info!(
?dst_peer_id,
?listener,
"try direct connect to peer success"
);
#[tracing::instrument(skip(self))]
async fn do_try_direct_connect_internal(
self: &Arc<DirectConnectorManagerData>,
dst_peer_id: PeerId,
ip_list: GetIpListResponse,
) -> Result<(), Error> {
let enable_ipv6 = self.global_ctx.get_flags().enable_ipv6;
let available_listeners = ip_list
.listeners
.clone()
.into_iter()
.map(Into::<url::Url>::into)
.filter_map(|l| if l.scheme() != "ring" { Some(l) } else { None })
.filter(|l| l.port().is_some() && l.host().is_some())
.filter(|l| enable_ipv6 || !matches!(l.host().unwrap().to_owned(), Host::Ipv6(_)))
.collect::<Vec<_>>();
tracing::debug!(?available_listeners, "got available listeners");
if available_listeners.is_empty() {
return Err(anyhow::anyhow!("peer {} have no valid listener", dst_peer_id).into());
}
let default_protocol = self.global_ctx.get_flags().default_protocol;
// sort available listeners, default protocol has the highest priority, udp is second, others just random
// highest priority is in the last
let mut available_listeners = available_listeners;
available_listeners.sort_by_key(|l| {
let scheme = l.scheme();
if scheme == default_protocol {
3
} else if scheme == "udp" {
2
} else {
1
}
});
while !available_listeners.is_empty() {
let mut tasks = JoinSet::new();
let mut listener_list = vec![];
let cur_scheme = available_listeners.last().unwrap().scheme().to_owned();
while let Some(listener) = available_listeners.last() {
if listener.scheme() != cur_scheme {
break;
}
Ok(Err(e)) => {
tracing::info!(?e, "try direct connect to peer failed");
}
Err(e) => {
tracing::error!(?e, "try direct connect to peer task join failed");
}
tracing::debug!("try direct connect to peer with listener: {}", listener);
self.spawn_direct_connect_task(
dst_peer_id.clone(),
&ip_list,
&listener,
&mut tasks,
);
listener_list.push(listener.clone().to_string());
available_listeners.pop();
}
let ret = tasks.join_all().await;
tracing::debug!(
?ret,
?dst_peer_id,
?cur_scheme,
?listener_list,
"all tasks finished for current scheme"
);
if self.peer_manager.has_directly_connected_conn(dst_peer_id) {
tracing::info!(
"direct connect to peer {} success, has direct conn",
dst_peer_id
);
return Ok(());
}
}
Ok(())
}
#[tracing::instrument]
#[tracing::instrument(skip(self))]
async fn do_try_direct_connect(
self: Arc<DirectConnectorManagerData>,
dst_peer_id: PeerId,