retry create tun device if it closed (#1279)

This commit is contained in:
Sijie.Sun
2025-08-24 15:25:09 +08:00
committed by GitHub
parent ea76114d50
commit 0804fd6632
3 changed files with 104 additions and 26 deletions

View File

@@ -8,6 +8,7 @@ use hickory_client::client::{Client, ClientHandle as _};
use hickory_proto::rr;
use hickory_proto::runtime::TokioRuntimeProvider;
use hickory_proto::udp::UdpClientStream;
use tokio::sync::Notify;
use tokio_util::sync::CancellationToken;
use crate::common::global_ctx::tests::get_mock_global_ctx;
@@ -33,7 +34,12 @@ pub async fn prepare_env(dns_name: &str, tun_ip: Ipv4Inet) -> (Arc<PeerManager>,
replace_stun_info_collector(peer_mgr.clone(), NatType::PortRestricted);
let r = Arc::new(tokio::sync::Mutex::new(r));
let mut virtual_nic = NicCtx::new(peer_mgr.get_global_ctx(), &peer_mgr, r);
let mut virtual_nic = NicCtx::new(
peer_mgr.get_global_ctx(),
&peer_mgr,
r,
Arc::new(Notify::new()),
);
virtual_nic.run(Some(tun_ip), None).await.unwrap();
(peer_mgr, virtual_nic)

View File

@@ -3,10 +3,13 @@ use std::collections::HashSet;
use std::net::{IpAddr, Ipv4Addr};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Weak};
use std::time::Duration;
use anyhow::Context;
use cidr::{IpCidr, Ipv4Inet};
use futures::FutureExt;
use tokio::sync::{oneshot, Notify};
use tokio::{sync::Mutex, task::JoinSet};
use tokio_util::sync::CancellationToken;
@@ -425,6 +428,7 @@ impl Instance {
let default_ipv4_addr = Ipv4Inet::new(Ipv4Addr::new(10, 126, 126, 0), 24).unwrap();
let mut current_dhcp_ip: Option<Ipv4Inet> = None;
let mut next_sleep_time = 0;
let nic_closed_notifier = Arc::new(Notify::new());
loop {
tokio::time::sleep(std::time::Duration::from_secs(next_sleep_time)).await;
@@ -433,6 +437,11 @@ impl Instance {
return;
};
if nic_closed_notifier.notified().now_or_never().is_some() {
tracing::debug!("nic ctx is closed, try recreate it");
current_dhcp_ip = None;
}
// do not allocate ip if no peer connected
let routes = peer_manager_c.list_routes().await;
if routes.is_empty() {
@@ -494,6 +503,7 @@ impl Instance {
global_ctx_c.clone(),
&peer_manager_c,
_peer_packet_receiver.clone(),
nic_closed_notifier.clone(),
);
if let Err(e) = new_nic_ctx.run(Some(ip), global_ctx_c.get_ipv6()).await {
tracing::error!(
@@ -526,6 +536,75 @@ impl Instance {
});
}
fn check_for_static_ip(&self, first_round_output: oneshot::Sender<Result<(), Error>>) {
let ipv4_addr = self.global_ctx.get_ipv4();
let ipv6_addr = self.global_ctx.get_ipv6();
// Only run if we have at least one IP address (IPv4 or IPv6)
if ipv4_addr.is_none() && ipv6_addr.is_none() {
let _ = first_round_output.send(Ok(()));
return;
}
let nic_ctx = self.nic_ctx.clone();
let peer_mgr = Arc::downgrade(&self.peer_manager);
let peer_packet_receiver = self.peer_packet_receiver.clone();
tokio::spawn(async move {
let mut output_tx = Some(first_round_output);
loop {
let Some(peer_manager) = peer_mgr.upgrade() else {
tracing::warn!("peer manager is dropped, stop static ip check.");
if let Some(output_tx) = output_tx.take() {
let _ = output_tx.send(Err(Error::Unknown));
return;
}
return;
};
let close_notifier = Arc::new(Notify::new());
let mut new_nic_ctx = NicCtx::new(
peer_manager.get_global_ctx(),
&peer_manager,
peer_packet_receiver.clone(),
close_notifier.clone(),
);
if let Err(e) = new_nic_ctx.run(ipv4_addr, ipv6_addr).await {
if let Some(output_tx) = output_tx.take() {
let _ = output_tx.send(Err(e));
return;
}
tracing::error!("failed to create new nic ctx, err: {:?}", e);
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
let ifname = new_nic_ctx.ifname().await;
// Create Magic DNS runner only if we have IPv4
let dns_runner = if let Some(ipv4) = ipv4_addr {
Self::create_magic_dns_runner(peer_manager, ifname, ipv4)
} else {
None
};
Self::use_new_nic_ctx(nic_ctx.clone(), new_nic_ctx, dns_runner).await;
if let Some(output_tx) = output_tx.take() {
let _ = output_tx.send(Ok(()));
}
// NOTICE: make sure we do not hold the peer manager here,
while close_notifier.notified().now_or_never().is_none() {
tokio::time::sleep(Duration::from_secs(1)).await;
if peer_mgr.strong_count() == 0 {
tracing::warn!("peer manager is dropped, stop static ip check.");
return;
}
}
}
});
}
async fn run_quic_dst(&mut self) -> Result<(), Error> {
if self.global_ctx.get_flags().disable_quic_input {
return Ok(());
@@ -554,28 +633,9 @@ impl Instance {
if !self.global_ctx.config.get_flags().no_tun {
#[cfg(not(any(target_os = "android", target_env = "ohos")))]
{
let ipv4_addr = self.global_ctx.get_ipv4();
let ipv6_addr = self.global_ctx.get_ipv6();
// Only run if we have at least one IP address (IPv4 or IPv6)
if ipv4_addr.is_some() || ipv6_addr.is_some() {
let mut new_nic_ctx = NicCtx::new(
self.global_ctx.clone(),
&self.peer_manager,
self.peer_packet_receiver.clone(),
);
new_nic_ctx.run(ipv4_addr, ipv6_addr).await?;
let ifname = new_nic_ctx.ifname().await;
// Create Magic DNS runner only if we have IPv4
let dns_runner = if let Some(ipv4) = ipv4_addr {
Self::create_magic_dns_runner(self.peer_manager.clone(), ifname, ipv4)
} else {
None
};
Self::use_new_nic_ctx(self.nic_ctx.clone(), new_nic_ctx, dns_runner).await;
}
let (output_tx, output_rx) = oneshot::channel();
self.check_for_static_ip(output_tx);
output_rx.await.unwrap()?;
}
}
@@ -1039,10 +1099,12 @@ impl Instance {
if fd <= 0 {
return Ok(());
}
let close_notifier = Arc::new(Notify::new());
let mut new_nic_ctx = NicCtx::new(
global_ctx.clone(),
&peer_manager,
peer_packet_receiver.clone(),
close_notifier.clone(),
);
new_nic_ctx
.run_for_android(fd)

View File

@@ -29,7 +29,7 @@ use pin_project_lite::pin_project;
use pnet::packet::{ipv4::Ipv4Packet, ipv6::Ipv6Packet};
use tokio::{
io::{AsyncRead, AsyncWrite, ReadBuf},
sync::Mutex,
sync::{Mutex, Notify},
task::JoinSet,
};
use tokio_util::bytes::Bytes;
@@ -626,6 +626,8 @@ pub struct NicCtx {
peer_mgr: Weak<PeerManager>,
peer_packet_receiver: Arc<Mutex<PacketRecvChanReceiver>>,
close_notifier: Arc<Notify>,
nic: Arc<Mutex<VirtualNic>>,
tasks: JoinSet<()>,
}
@@ -635,11 +637,15 @@ impl NicCtx {
global_ctx: ArcGlobalCtx,
peer_manager: &Arc<PeerManager>,
peer_packet_receiver: Arc<Mutex<PacketRecvChanReceiver>>,
close_notifier: Arc<Notify>,
) -> Self {
NicCtx {
global_ctx: global_ctx.clone(),
peer_mgr: Arc::downgrade(peer_manager),
peer_packet_receiver,
close_notifier,
nic: Arc::new(Mutex::new(VirtualNic::new(global_ctx))),
tasks: JoinSet::new(),
}
@@ -753,6 +759,7 @@ impl NicCtx {
let Some(mgr) = self.peer_mgr.upgrade() else {
return Err(anyhow::anyhow!("peer manager not available").into());
};
let close_notifier = self.close_notifier.clone();
self.tasks.spawn(async move {
while let Some(ret) = stream.next().await {
if ret.is_err() {
@@ -761,7 +768,8 @@ impl NicCtx {
}
Self::do_forward_nic_to_peers(ret.unwrap(), mgr.as_ref()).await;
}
panic!("nic stream closed");
close_notifier.notify_one();
tracing::error!("nic closed when recving from it");
});
Ok(())
@@ -769,6 +777,7 @@ impl NicCtx {
fn do_forward_peers_to_nic(&mut self, mut sink: Pin<Box<dyn ZCPacketSink>>) {
let channel = self.peer_packet_receiver.clone();
let close_notifier = self.close_notifier.clone();
self.tasks.spawn(async move {
// unlock until coroutine finished
let mut channel = channel.lock().await;
@@ -782,7 +791,8 @@ impl NicCtx {
tracing::error!(?ret, "do_forward_tunnel_to_nic sink error");
}
}
panic!("peer packet receiver closed");
close_notifier.notify_one();
tracing::error!("nic closed when sending to it");
});
}