diff --git a/easytier/src/instance/dns_server/tests.rs b/easytier/src/instance/dns_server/tests.rs index fb046b2..443d4cb 100644 --- a/easytier/src/instance/dns_server/tests.rs +++ b/easytier/src/instance/dns_server/tests.rs @@ -8,6 +8,7 @@ use hickory_client::client::{Client, ClientHandle as _}; use hickory_proto::rr; use hickory_proto::runtime::TokioRuntimeProvider; use hickory_proto::udp::UdpClientStream; +use tokio::sync::Notify; use tokio_util::sync::CancellationToken; use crate::common::global_ctx::tests::get_mock_global_ctx; @@ -33,7 +34,12 @@ pub async fn prepare_env(dns_name: &str, tun_ip: Ipv4Inet) -> (Arc, replace_stun_info_collector(peer_mgr.clone(), NatType::PortRestricted); let r = Arc::new(tokio::sync::Mutex::new(r)); - let mut virtual_nic = NicCtx::new(peer_mgr.get_global_ctx(), &peer_mgr, r); + let mut virtual_nic = NicCtx::new( + peer_mgr.get_global_ctx(), + &peer_mgr, + r, + Arc::new(Notify::new()), + ); virtual_nic.run(Some(tun_ip), None).await.unwrap(); (peer_mgr, virtual_nic) diff --git a/easytier/src/instance/instance.rs b/easytier/src/instance/instance.rs index 9ad6a43..9d49d5e 100644 --- a/easytier/src/instance/instance.rs +++ b/easytier/src/instance/instance.rs @@ -3,10 +3,13 @@ use std::collections::HashSet; use std::net::{IpAddr, Ipv4Addr}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Weak}; +use std::time::Duration; use anyhow::Context; use cidr::{IpCidr, Ipv4Inet}; +use futures::FutureExt; +use tokio::sync::{oneshot, Notify}; use tokio::{sync::Mutex, task::JoinSet}; use tokio_util::sync::CancellationToken; @@ -425,6 +428,7 @@ impl Instance { let default_ipv4_addr = Ipv4Inet::new(Ipv4Addr::new(10, 126, 126, 0), 24).unwrap(); let mut current_dhcp_ip: Option = None; let mut next_sleep_time = 0; + let nic_closed_notifier = Arc::new(Notify::new()); loop { tokio::time::sleep(std::time::Duration::from_secs(next_sleep_time)).await; @@ -433,6 +437,11 @@ impl Instance { return; }; + if nic_closed_notifier.notified().now_or_never().is_some() { + tracing::debug!("nic ctx is closed, try recreate it"); + current_dhcp_ip = None; + } + // do not allocate ip if no peer connected let routes = peer_manager_c.list_routes().await; if routes.is_empty() { @@ -494,6 +503,7 @@ impl Instance { global_ctx_c.clone(), &peer_manager_c, _peer_packet_receiver.clone(), + nic_closed_notifier.clone(), ); if let Err(e) = new_nic_ctx.run(Some(ip), global_ctx_c.get_ipv6()).await { tracing::error!( @@ -526,6 +536,75 @@ impl Instance { }); } + fn check_for_static_ip(&self, first_round_output: oneshot::Sender>) { + let ipv4_addr = self.global_ctx.get_ipv4(); + let ipv6_addr = self.global_ctx.get_ipv6(); + + // Only run if we have at least one IP address (IPv4 or IPv6) + if ipv4_addr.is_none() && ipv6_addr.is_none() { + let _ = first_round_output.send(Ok(())); + return; + } + + let nic_ctx = self.nic_ctx.clone(); + let peer_mgr = Arc::downgrade(&self.peer_manager); + let peer_packet_receiver = self.peer_packet_receiver.clone(); + + tokio::spawn(async move { + let mut output_tx = Some(first_round_output); + loop { + let Some(peer_manager) = peer_mgr.upgrade() else { + tracing::warn!("peer manager is dropped, stop static ip check."); + if let Some(output_tx) = output_tx.take() { + let _ = output_tx.send(Err(Error::Unknown)); + return; + } + return; + }; + + let close_notifier = Arc::new(Notify::new()); + let mut new_nic_ctx = NicCtx::new( + peer_manager.get_global_ctx(), + &peer_manager, + peer_packet_receiver.clone(), + close_notifier.clone(), + ); + + if let Err(e) = new_nic_ctx.run(ipv4_addr, ipv6_addr).await { + if let Some(output_tx) = output_tx.take() { + let _ = output_tx.send(Err(e)); + return; + } + tracing::error!("failed to create new nic ctx, err: {:?}", e); + tokio::time::sleep(Duration::from_secs(1)).await; + continue; + } + let ifname = new_nic_ctx.ifname().await; + + // Create Magic DNS runner only if we have IPv4 + let dns_runner = if let Some(ipv4) = ipv4_addr { + Self::create_magic_dns_runner(peer_manager, ifname, ipv4) + } else { + None + }; + Self::use_new_nic_ctx(nic_ctx.clone(), new_nic_ctx, dns_runner).await; + + if let Some(output_tx) = output_tx.take() { + let _ = output_tx.send(Ok(())); + } + + // NOTICE: make sure we do not hold the peer manager here, + while close_notifier.notified().now_or_never().is_none() { + tokio::time::sleep(Duration::from_secs(1)).await; + if peer_mgr.strong_count() == 0 { + tracing::warn!("peer manager is dropped, stop static ip check."); + return; + } + } + } + }); + } + async fn run_quic_dst(&mut self) -> Result<(), Error> { if self.global_ctx.get_flags().disable_quic_input { return Ok(()); @@ -554,28 +633,9 @@ impl Instance { if !self.global_ctx.config.get_flags().no_tun { #[cfg(not(any(target_os = "android", target_env = "ohos")))] { - let ipv4_addr = self.global_ctx.get_ipv4(); - let ipv6_addr = self.global_ctx.get_ipv6(); - - // Only run if we have at least one IP address (IPv4 or IPv6) - if ipv4_addr.is_some() || ipv6_addr.is_some() { - let mut new_nic_ctx = NicCtx::new( - self.global_ctx.clone(), - &self.peer_manager, - self.peer_packet_receiver.clone(), - ); - - new_nic_ctx.run(ipv4_addr, ipv6_addr).await?; - let ifname = new_nic_ctx.ifname().await; - - // Create Magic DNS runner only if we have IPv4 - let dns_runner = if let Some(ipv4) = ipv4_addr { - Self::create_magic_dns_runner(self.peer_manager.clone(), ifname, ipv4) - } else { - None - }; - Self::use_new_nic_ctx(self.nic_ctx.clone(), new_nic_ctx, dns_runner).await; - } + let (output_tx, output_rx) = oneshot::channel(); + self.check_for_static_ip(output_tx); + output_rx.await.unwrap()?; } } @@ -1039,10 +1099,12 @@ impl Instance { if fd <= 0 { return Ok(()); } + let close_notifier = Arc::new(Notify::new()); let mut new_nic_ctx = NicCtx::new( global_ctx.clone(), &peer_manager, peer_packet_receiver.clone(), + close_notifier.clone(), ); new_nic_ctx .run_for_android(fd) diff --git a/easytier/src/instance/virtual_nic.rs b/easytier/src/instance/virtual_nic.rs index cb4d94f..2aa3fd8 100644 --- a/easytier/src/instance/virtual_nic.rs +++ b/easytier/src/instance/virtual_nic.rs @@ -29,7 +29,7 @@ use pin_project_lite::pin_project; use pnet::packet::{ipv4::Ipv4Packet, ipv6::Ipv6Packet}; use tokio::{ io::{AsyncRead, AsyncWrite, ReadBuf}, - sync::Mutex, + sync::{Mutex, Notify}, task::JoinSet, }; use tokio_util::bytes::Bytes; @@ -626,6 +626,8 @@ pub struct NicCtx { peer_mgr: Weak, peer_packet_receiver: Arc>, + close_notifier: Arc, + nic: Arc>, tasks: JoinSet<()>, } @@ -635,11 +637,15 @@ impl NicCtx { global_ctx: ArcGlobalCtx, peer_manager: &Arc, peer_packet_receiver: Arc>, + close_notifier: Arc, ) -> Self { NicCtx { global_ctx: global_ctx.clone(), peer_mgr: Arc::downgrade(peer_manager), peer_packet_receiver, + + close_notifier, + nic: Arc::new(Mutex::new(VirtualNic::new(global_ctx))), tasks: JoinSet::new(), } @@ -753,6 +759,7 @@ impl NicCtx { let Some(mgr) = self.peer_mgr.upgrade() else { return Err(anyhow::anyhow!("peer manager not available").into()); }; + let close_notifier = self.close_notifier.clone(); self.tasks.spawn(async move { while let Some(ret) = stream.next().await { if ret.is_err() { @@ -761,7 +768,8 @@ impl NicCtx { } Self::do_forward_nic_to_peers(ret.unwrap(), mgr.as_ref()).await; } - panic!("nic stream closed"); + close_notifier.notify_one(); + tracing::error!("nic closed when recving from it"); }); Ok(()) @@ -769,6 +777,7 @@ impl NicCtx { fn do_forward_peers_to_nic(&mut self, mut sink: Pin>) { let channel = self.peer_packet_receiver.clone(); + let close_notifier = self.close_notifier.clone(); self.tasks.spawn(async move { // unlock until coroutine finished let mut channel = channel.lock().await; @@ -782,7 +791,8 @@ impl NicCtx { tracing::error!(?ret, "do_forward_tunnel_to_nic sink error"); } } - panic!("peer packet receiver closed"); + close_notifier.notify_one(); + tracing::error!("nic closed when sending to it"); }); }