diff --git a/Cargo.lock b/Cargo.lock index ca70395..2640d50 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -240,9 +240,9 @@ checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" [[package]] name = "async-broadcast" -version = "0.7.1" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "20cd0e2e25ea8e5f7e9df04578dc6cf5c83577fd09b1a46aaf5c85e1c33f2a7e" +checksum = "435a87a52755b8f27fcf321ac4f04b2802e337c8c4872923137471ec39c37532" dependencies = [ "event-listener", "event-listener-strategy", @@ -1925,6 +1925,9 @@ dependencies = [ "http", "http_req", "humansize", + "jemalloc-ctl", + "jemalloc-sys", + "jemallocator", "kcp-sys", "machine-uid", "mimalloc-rust", @@ -3715,6 +3718,37 @@ dependencies = [ "system-deps", ] +[[package]] +name = "jemalloc-ctl" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7cffc705424a344c054e135d12ee591402f4539245e8bbd64e6c9eaa9458b63c" +dependencies = [ + "jemalloc-sys", + "libc", + "paste", +] + +[[package]] +name = "jemalloc-sys" +version = "0.5.4+5.3.0-patched" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac6c1946e1cea1788cbfde01c993b52a10e2da07f4bac608228d1bed20bfebf2" +dependencies = [ + "cc", + "libc", +] + +[[package]] +name = "jemallocator" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0de374a9f8e63150e6f5e8a60cc14c668226d7a347d8aee1a45766e3c4dd3bc" +dependencies = [ + "jemalloc-sys", + "libc", +] + [[package]] name = "jni" version = "0.19.0" diff --git a/Cargo.toml b/Cargo.toml index 33d740e..37a29d7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,11 @@ [workspace] resolver = "2" -members = ["easytier", "easytier-gui/src-tauri", "easytier-rpc-build", "easytier-web"] +members = [ + "easytier", + "easytier-gui/src-tauri", + "easytier-rpc-build", + "easytier-web", +] default-members = ["easytier", "easytier-web"] [profile.dev] diff --git a/easytier-web/frontend-lib/src/types/network.ts b/easytier-web/frontend-lib/src/types/network.ts index 2bf94e3..c6a0bf9 100644 --- a/easytier-web/frontend-lib/src/types/network.ts +++ b/easytier-web/frontend-lib/src/types/network.ts @@ -93,7 +93,7 @@ export function DEFAULT_NETWORK_CONFIG(): NetworkConfig { 'udp://0.0.0.0:11010', 'wg://0.0.0.0:11011', ], - rpc_port: 15888, + rpc_port: 0, latency_first: false, dev_name: '', diff --git a/easytier/Cargo.toml b/easytier/Cargo.toml index c340ca4..3582ee3 100644 --- a/easytier/Cargo.toml +++ b/easytier/Cargo.toml @@ -205,6 +205,14 @@ hickory-resolver = "0.24.4" bounded_join_set = "0.3.0" +jemallocator = { version = "0.5.4", optional = true } +jemalloc-ctl = { version = "0.5.4", optional = true } +jemalloc-sys = { version = "0.5.4", features = [ + "stats", + "profiling", + "unprefixed_malloc_on_supported_platforms", +], optional = true } + [target.'cfg(any(target_os = "linux", target_os = "macos", target_os = "windows", target_os = "freebsd"))'.dependencies] machine-uid = "0.5.3" @@ -280,3 +288,4 @@ websocket = [ ] smoltcp = ["dep:smoltcp", "dep:parking_lot"] socks5 = ["dep:smoltcp"] +jemalloc = ["dep:jemallocator", "dep:jemalloc-ctl", "dep:jemalloc-sys"] diff --git a/easytier/src/common/global_ctx.rs b/easytier/src/common/global_ctx.rs index 3774f9c..bae73ab 100644 --- a/easytier/src/common/global_ctx.rs +++ b/easytier/src/common/global_ctx.rs @@ -97,7 +97,7 @@ impl GlobalCtx { let net_ns = NetNS::new(config_fs.get_netns()); let hostname = config_fs.get_hostname(); - let (event_bus, _) = tokio::sync::broadcast::channel(1024); + let (event_bus, _) = tokio::sync::broadcast::channel(8); let stun_info_collection = Arc::new(StunInfoCollector::new_with_default_servers()); @@ -141,10 +141,13 @@ impl GlobalCtx { } pub fn issue_event(&self, event: GlobalCtxEvent) { - if self.event_bus.receiver_count() != 0 { - self.event_bus.send(event).unwrap(); - } else { - tracing::warn!("No subscriber for event: {:?}", event); + if let Err(e) = self.event_bus.send(event.clone()) { + tracing::warn!( + "Failed to send event: {:?}, error: {:?}, receiver count: {}", + event, + e, + self.event_bus.receiver_count() + ); } } diff --git a/easytier/src/connector/manual.rs b/easytier/src/connector/manual.rs index 8434e59..edd737d 100644 --- a/easytier/src/connector/manual.rs +++ b/easytier/src/connector/manual.rs @@ -3,7 +3,10 @@ use std::{collections::BTreeSet, sync::Arc}; use anyhow::Context; use dashmap::{DashMap, DashSet}; use tokio::{ - sync::{broadcast::Receiver, mpsc, Mutex}, + sync::{ + broadcast::{error::RecvError, Receiver}, + mpsc, Mutex, + }, task::JoinSet, time::timeout, }; @@ -179,8 +182,37 @@ impl ManualConnectorManager { mut event_recv: Receiver, ) { loop { - let event = event_recv.recv().await.expect("event_recv got error"); - Self::handle_event(&event, &data).await; + match event_recv.recv().await { + Ok(event) => { + Self::handle_event(&event, &data).await; + } + Err(RecvError::Lagged(n)) => { + tracing::warn!("event_recv lagged: {}, rebuild alive conn list", n); + event_recv = event_recv.resubscribe(); + data.alive_conn_urls.clear(); + for x in data + .peer_manager + .get_peer_map() + .get_alive_conns() + .iter() + .map(|x| { + x.tunnel + .clone() + .unwrap_or_default() + .remote_addr + .unwrap_or_default() + .to_string() + }) + { + data.alive_conn_urls.insert(x); + } + continue; + } + Err(RecvError::Closed) => { + tracing::warn!("event_recv closed, exit"); + break; + } + } } } @@ -271,7 +303,6 @@ impl ManualConnectorManager { async fn collect_dead_conns(data: Arc) -> BTreeSet { Self::handle_remove_connector(data.clone()); - let all_urls: BTreeSet = data .connectors .iter() diff --git a/easytier/src/easytier-core.rs b/easytier/src/easytier-core.rs index 3eda485..b911fb4 100644 --- a/easytier/src/easytier-core.rs +++ b/easytier/src/easytier-core.rs @@ -6,12 +6,12 @@ extern crate rust_i18n; use std::{ net::{Ipv4Addr, SocketAddr}, path::PathBuf, + process::ExitCode, sync::Arc, }; use anyhow::Context; use clap::Parser; -use tokio::net::TcpSocket; use easytier::{ common::{ @@ -38,13 +38,58 @@ use easytier::{ #[cfg(target_os = "windows")] windows_service::define_windows_service!(ffi_service_main, win_service_main); -#[cfg(feature = "mimalloc")] +#[cfg(all(feature = "mimalloc", not(feature = "jemalloc")))] use mimalloc_rust::GlobalMiMalloc; -#[cfg(feature = "mimalloc")] +#[cfg(all(feature = "mimalloc", not(feature = "jemalloc")))] #[global_allocator] static GLOBAL_MIMALLOC: GlobalMiMalloc = GlobalMiMalloc; +#[cfg(feature = "jemalloc")] +use jemalloc_ctl::{epoch, stats, Access as _, AsName as _}; + +#[cfg(feature = "jemalloc")] +#[global_allocator] +static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; + +fn set_prof_active(_active: bool) { + #[cfg(feature = "jemalloc")] + { + const PROF_ACTIVE: &'static [u8] = b"prof.active\0"; + let name = PROF_ACTIVE.name(); + name.write(_active).expect("Should succeed to set prof"); + } +} + +fn dump_profile(_cur_allocated: usize) { + #[cfg(feature = "jemalloc")] + { + const PROF_DUMP: &'static [u8] = b"prof.dump\0"; + static mut PROF_DUMP_FILE_NAME: [u8; 128] = [0; 128]; + let file_name_str = format!( + "profile-{}-{}.out", + _cur_allocated, + chrono::Local::now().format("%Y-%m-%d-%H-%M-%S") + ); + // copy file name to PROF_DUMP + let file_name = file_name_str.as_bytes(); + let len = file_name.len(); + if len > 127 { + panic!("file name too long"); + } + unsafe { + PROF_DUMP_FILE_NAME[..len].copy_from_slice(file_name); + // set the last byte to 0 + PROF_DUMP_FILE_NAME[len] = 0; + + let name = PROF_DUMP.name(); + name.write(&PROF_DUMP_FILE_NAME[..len + 1]) + .expect("Should succeed to dump profile"); + println!("dump profile to: {}", file_name_str); + } + } +} + #[derive(Parser, Debug)] #[command(name = "easytier-core", author, version = EASYTIER_VERSION , about, long_about = None)] struct Cli { @@ -116,7 +161,7 @@ struct Cli { short, long, help = t!("core_clap.rpc_portal").to_string(), - default_value = "15888" + default_value = "0" )] rpc_portal: String, @@ -391,22 +436,8 @@ impl Cli { Ok(listeners) } - fn check_tcp_available(port: u16) -> Option { - let s = format!("0.0.0.0:{}", port).parse::().unwrap(); - TcpSocket::new_v4().unwrap().bind(s).map(|_| s).ok() - } - fn parse_rpc_portal(rpc_portal: String) -> anyhow::Result { if let Ok(port) = rpc_portal.parse::() { - if port == 0 { - // check tcp 15888 first - for i in 15888..15900 { - if let Some(s) = Cli::check_tcp_available(i) { - return Ok(s); - } - } - return Ok("0.0.0.0:0".parse().unwrap()); - } return Ok(format!("0.0.0.0:{}", port).parse().unwrap()); } @@ -652,114 +683,118 @@ fn peer_conn_info_to_string(p: proto::cli::PeerConnInfo) -> String { #[tracing::instrument] pub fn handle_event(mut events: EventBusSubscriber) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { - while let Ok(e) = events.recv().await { - match e { - GlobalCtxEvent::PeerAdded(p) => { - print_event(format!("new peer added. peer_id: {}", p)); - } - - GlobalCtxEvent::PeerRemoved(p) => { - print_event(format!("peer removed. peer_id: {}", p)); - } - - GlobalCtxEvent::PeerConnAdded(p) => { - print_event(format!( - "new peer connection added. conn_info: {}", - peer_conn_info_to_string(p) - )); - } - - GlobalCtxEvent::PeerConnRemoved(p) => { - print_event(format!( - "peer connection removed. conn_info: {}", - peer_conn_info_to_string(p) - )); - } - - GlobalCtxEvent::ListenerAddFailed(p, msg) => { - print_event(format!( - "listener add failed. listener: {}, msg: {}", - p, msg - )); - } - - GlobalCtxEvent::ListenerAcceptFailed(p, msg) => { - print_event(format!( - "listener accept failed. listener: {}, msg: {}", - p, msg - )); - } - - GlobalCtxEvent::ListenerAdded(p) => { - if p.scheme() == "ring" { - continue; + loop { + if let Ok(e) = events.recv().await { + match e { + GlobalCtxEvent::PeerAdded(p) => { + print_event(format!("new peer added. peer_id: {}", p)); } - print_event(format!("new listener added. listener: {}", p)); - } - GlobalCtxEvent::ConnectionAccepted(local, remote) => { - print_event(format!( - "new connection accepted. local: {}, remote: {}", - local, remote - )); - } + GlobalCtxEvent::PeerRemoved(p) => { + print_event(format!("peer removed. peer_id: {}", p)); + } - GlobalCtxEvent::ConnectionError(local, remote, err) => { - print_event(format!( - "connection error. local: {}, remote: {}, err: {}", - local, remote, err - )); - } + GlobalCtxEvent::PeerConnAdded(p) => { + print_event(format!( + "new peer connection added. conn_info: {}", + peer_conn_info_to_string(p) + )); + } - GlobalCtxEvent::TunDeviceReady(dev) => { - print_event(format!("tun device ready. dev: {}", dev)); - } + GlobalCtxEvent::PeerConnRemoved(p) => { + print_event(format!( + "peer connection removed. conn_info: {}", + peer_conn_info_to_string(p) + )); + } - GlobalCtxEvent::TunDeviceError(err) => { - print_event(format!("tun device error. err: {}", err)); - } + GlobalCtxEvent::ListenerAddFailed(p, msg) => { + print_event(format!( + "listener add failed. listener: {}, msg: {}", + p, msg + )); + } - GlobalCtxEvent::Connecting(dst) => { - print_event(format!("connecting to peer. dst: {}", dst)); - } + GlobalCtxEvent::ListenerAcceptFailed(p, msg) => { + print_event(format!( + "listener accept failed. listener: {}, msg: {}", + p, msg + )); + } - GlobalCtxEvent::ConnectError(dst, ip_version, err) => { - print_event(format!( - "connect to peer error. dst: {}, ip_version: {}, err: {}", - dst, ip_version, err - )); - } + GlobalCtxEvent::ListenerAdded(p) => { + if p.scheme() == "ring" { + continue; + } + print_event(format!("new listener added. listener: {}", p)); + } - GlobalCtxEvent::VpnPortalClientConnected(portal, client_addr) => { - print_event(format!( - "vpn portal client connected. portal: {}, client_addr: {}", - portal, client_addr - )); - } + GlobalCtxEvent::ConnectionAccepted(local, remote) => { + print_event(format!( + "new connection accepted. local: {}, remote: {}", + local, remote + )); + } - GlobalCtxEvent::VpnPortalClientDisconnected(portal, client_addr) => { - print_event(format!( - "vpn portal client disconnected. portal: {}, client_addr: {}", - portal, client_addr - )); - } + GlobalCtxEvent::ConnectionError(local, remote, err) => { + print_event(format!( + "connection error. local: {}, remote: {}, err: {}", + local, remote, err + )); + } - GlobalCtxEvent::DhcpIpv4Changed(old, new) => { - print_event(format!("dhcp ip changed. old: {:?}, new: {:?}", old, new)); - } + GlobalCtxEvent::TunDeviceReady(dev) => { + print_event(format!("tun device ready. dev: {}", dev)); + } - GlobalCtxEvent::DhcpIpv4Conflicted(ip) => { - print_event(format!("dhcp ip conflict. ip: {:?}", ip)); - } + GlobalCtxEvent::TunDeviceError(err) => { + print_event(format!("tun device error. err: {}", err)); + } - GlobalCtxEvent::PortForwardAdded(cfg) => { - print_event(format!( - "port forward added. local: {}, remote: {}, proto: {}", - cfg.bind_addr.unwrap().to_string(), - cfg.dst_addr.unwrap().to_string(), - cfg.socket_type().as_str_name() - )); + GlobalCtxEvent::Connecting(dst) => { + print_event(format!("connecting to peer. dst: {}", dst)); + } + + GlobalCtxEvent::ConnectError(dst, ip_version, err) => { + print_event(format!( + "connect to peer error. dst: {}, ip_version: {}, err: {}", + dst, ip_version, err + )); + } + + GlobalCtxEvent::VpnPortalClientConnected(portal, client_addr) => { + print_event(format!( + "vpn portal client connected. portal: {}, client_addr: {}", + portal, client_addr + )); + } + + GlobalCtxEvent::VpnPortalClientDisconnected(portal, client_addr) => { + print_event(format!( + "vpn portal client disconnected. portal: {}, client_addr: {}", + portal, client_addr + )); + } + + GlobalCtxEvent::DhcpIpv4Changed(old, new) => { + print_event(format!("dhcp ip changed. old: {:?}, new: {:?}", old, new)); + } + + GlobalCtxEvent::DhcpIpv4Conflicted(ip) => { + print_event(format!("dhcp ip conflict. ip: {:?}", ip)); + } + + GlobalCtxEvent::PortForwardAdded(cfg) => { + print_event(format!( + "port forward added. local: {}, remote: {}, proto: {}", + cfg.bind_addr.unwrap().to_string(), + cfg.dst_addr.unwrap().to_string(), + cfg.socket_type().as_str_name() + )); + } } + } else { + events = events.resubscribe(); } } }) @@ -940,14 +975,61 @@ async fn run_main(cli: Cli) -> anyhow::Result<()> { let mut l = launcher::NetworkInstance::new(cfg).set_fetch_node_info(false); let _t = ScopedTask::from(handle_event(l.start().unwrap())); - if let Some(e) = l.wait().await { - anyhow::bail!("launcher error: {}", e); + tokio::select! { + e = l.wait() => { + if let Some(e) = e { + eprintln!("launcher error: {}", e); + } + } + _ = tokio::signal::ctrl_c() => { + println!("ctrl-c received, exiting..."); + } } Ok(()) } +fn memory_monitor() { + #[cfg(feature = "jemalloc")] + { + let mut last_peak_size = 0; + let e = epoch::mib().unwrap(); + let allocated_stats = stats::allocated::mib().unwrap(); + + loop { + e.advance().unwrap(); + let new_heap_size = allocated_stats.read().unwrap(); + + println!( + "heap size: {} bytes, time: {}", + new_heap_size, + chrono::Local::now().format("%Y-%m-%d %H:%M:%S") + ); + + // dump every 75MB + if last_peak_size > 0 + && new_heap_size > last_peak_size + && new_heap_size - last_peak_size > 75 * 1024 * 1024 + { + println!( + "heap size increased: {} bytes, time: {}", + new_heap_size - last_peak_size, + chrono::Local::now().format("%Y-%m-%d %H:%M:%S") + ); + dump_profile(new_heap_size); + last_peak_size = new_heap_size; + } + + if last_peak_size == 0 { + last_peak_size = new_heap_size; + } + + std::thread::sleep(std::time::Duration::from_secs(5)); + } + } +} + #[tokio::main(flavor = "current_thread")] -async fn main() { +async fn main() -> ExitCode { let locale = sys_locale::get_locale().unwrap_or_else(|| String::from("en-US")); rust_i18n::set_locale(&locale); setup_panic_handler(); @@ -968,10 +1050,21 @@ async fn main() { } }; + set_prof_active(true); + let _monitor = std::thread::spawn(memory_monitor); + let cli = Cli::parse(); + let mut ret_code = 0; if let Err(e) = run_main(cli).await { eprintln!("error: {:?}", e); - std::process::exit(1); + ret_code = 1; } + + println!("Stopping easytier..."); + + dump_profile(0); + set_prof_active(false); + + ExitCode::from(ret_code) } diff --git a/easytier/src/instance/virtual_nic.rs b/easytier/src/instance/virtual_nic.rs index eda8acf..7a2625c 100644 --- a/easytier/src/instance/virtual_nic.rs +++ b/easytier/src/instance/virtual_nic.rs @@ -65,7 +65,7 @@ impl Stream for TunStream { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut self_mut = self.project(); let mut g = ready!(self_mut.l.poll_lock(cx)); - reserve_buf(&mut self_mut.cur_buf, 2500, 32 * 1024); + reserve_buf(&mut self_mut.cur_buf, 2500, 4 * 1024); if self_mut.cur_buf.len() == 0 { unsafe { self_mut.cur_buf.set_len(*self_mut.payload_offset); diff --git a/easytier/src/launcher.rs b/easytier/src/launcher.rs index e0f0d07..cd70a66 100644 --- a/easytier/src/launcher.rs +++ b/easytier/src/launcher.rs @@ -1,5 +1,6 @@ use std::{ collections::VecDeque, + net::SocketAddr, sync::{atomic::AtomicBool, Arc, RwLock}, }; @@ -42,7 +43,7 @@ struct EasyTierData { impl Default for EasyTierData { fn default() -> Self { - let (tx, _) = broadcast::channel(100); + let (tx, _) = broadcast::channel(16); Self { event_subscriber: RwLock::new(tx), events: RwLock::new(VecDeque::new()), @@ -144,8 +145,19 @@ impl EasyTierLauncher { let data_c = data.clone(); tasks.spawn(async move { let mut receiver = global_ctx.subscribe(); - while let Ok(event) = receiver.recv().await { - Self::handle_easytier_event(event, &data_c).await; + loop { + match receiver.recv().await { + Ok(event) => { + Self::handle_easytier_event(event.clone(), &data_c).await; + } + Err(broadcast::error::RecvError::Closed) => { + break; + } + Err(broadcast::error::RecvError::Lagged(_)) => { + // do nothing currently + receiver = receiver.resubscribe(); + } + } } }); @@ -202,6 +214,27 @@ impl EasyTierLauncher { Ok(()) } + fn check_tcp_available(port: u16) -> bool { + let s = format!("0.0.0.0:{}", port).parse::().unwrap(); + std::net::TcpListener::bind(s).is_ok() + } + + fn select_proper_rpc_port(cfg: &TomlConfigLoader) { + let Some(mut f) = cfg.get_rpc_portal() else { + return; + }; + + if f.port() == 0 { + for i in 15888..15900 { + if Self::check_tcp_available(i) { + f.set_port(i); + cfg.set_rpc_portal(f); + break; + } + } + } + } + pub fn start(&mut self, cfg_generator: F) where F: FnOnce() -> Result + Send + Sync, @@ -217,6 +250,8 @@ impl EasyTierLauncher { self.running_cfg = cfg.dump(); + Self::select_proper_rpc_port(&cfg); + let stop_flag = self.stop_flag.clone(); let instance_alive = self.instance_alive.clone(); @@ -522,7 +557,8 @@ impl NetworkConfig { let mut routes = Vec::::with_capacity(self.routes.len()); for route in self.routes.iter() { routes.push( - route.parse() + route + .parse() .with_context(|| format!("failed to parse route: {}", route))?, ); } @@ -543,9 +579,7 @@ impl NetworkConfig { if self.enable_socks5.unwrap_or_default() { if let Some(socks5_port) = self.socks5_port { cfg.set_socks5_portal(Some( - format!("socks5://0.0.0.0:{}", socks5_port) - .parse() - .unwrap(), + format!("socks5://0.0.0.0:{}", socks5_port).parse().unwrap(), )); } } diff --git a/easytier/src/peers/peer.rs b/easytier/src/peers/peer.rs index e3bb12e..0863460 100644 --- a/easytier/src/peers/peer.rs +++ b/easytier/src/peers/peer.rs @@ -118,9 +118,21 @@ impl Peer { } pub async fn add_peer_conn(&self, mut conn: PeerConn) { - conn.set_close_event_sender(self.close_event_sender.clone()); + let close_event_sender = self.close_event_sender.clone(); + let close_notifier = conn.get_close_notifier(); + tokio::spawn(async move { + let conn_id = close_notifier.get_conn_id(); + if let Some(mut waiter) = close_notifier.get_waiter().await { + let _ = waiter.recv().await; + } + if let Err(e) = close_event_sender.send(conn_id).await { + tracing::warn!(?conn_id, "failed to send close event: {}", e); + } + }); + conn.start_recv_loop(self.packet_recv_chan.clone()).await; conn.start_pingpong(); + self.global_ctx .issue_event(GlobalCtxEvent::PeerConnAdded(conn.get_conn_info())); self.conns.insert(conn.get_conn_id(), Arc::new(conn)); diff --git a/easytier/src/peers/peer_conn.rs b/easytier/src/peers/peer_conn.rs index 9c5a78c..35694b4 100644 --- a/easytier/src/peers/peer_conn.rs +++ b/easytier/src/peers/peer_conn.rs @@ -13,7 +13,7 @@ use futures::{StreamExt, TryFutureExt}; use prost::Message; use tokio::{ - sync::{broadcast, mpsc, Mutex}, + sync::{broadcast, Mutex}, task::JoinSet, time::{timeout, Duration}, }; @@ -50,6 +50,41 @@ pub type PeerConnId = uuid::Uuid; const MAGIC: u32 = 0xd1e1a5e1; const VERSION: u32 = 1; +pub struct PeerConnCloseNotify { + conn_id: PeerConnId, + sender: Arc>>>, +} + +impl PeerConnCloseNotify { + fn new(conn_id: PeerConnId) -> Self { + let (sender, _) = broadcast::channel(1); + Self { + conn_id, + sender: Arc::new(std::sync::Mutex::new(Some(sender))), + } + } + + fn notify_close(&self) { + self.sender.lock().unwrap().take(); + } + + pub async fn get_waiter(&self) -> Option> { + if let Some(sender) = self.sender.lock().unwrap().as_mut() { + let receiver = sender.subscribe(); + return Some(receiver); + } + None + } + + pub fn get_conn_id(&self) -> PeerConnId { + self.conn_id + } + + pub fn is_closed(&self) -> bool { + self.sender.lock().unwrap().is_none() + } +} + pub struct PeerConn { conn_id: PeerConnId, @@ -66,7 +101,7 @@ pub struct PeerConn { info: Option, is_client: Option, - close_event_sender: Option>, + close_event_notifier: Arc, ctrl_resp_sender: broadcast::Sender, @@ -88,7 +123,7 @@ impl Debug for PeerConn { impl PeerConn { pub fn new(my_peer_id: PeerId, global_ctx: ArcGlobalCtx, tunnel: Box) -> Self { let tunnel_info = tunnel.info(); - let (ctrl_sender, _ctrl_receiver) = broadcast::channel(100); + let (ctrl_sender, _ctrl_receiver) = broadcast::channel(8); let peer_conn_tunnel_filter = StatsRecorderTunnelFilter::new(); let throughput = peer_conn_tunnel_filter.filter_output(); @@ -97,8 +132,10 @@ impl PeerConn { let (recv, sink) = (mpsc_tunnel.get_stream(), mpsc_tunnel.get_sink()); + let conn_id = PeerConnId::new_v4(); + PeerConn { - conn_id: PeerConnId::new_v4(), + conn_id: conn_id.clone(), my_peer_id, global_ctx, @@ -114,7 +151,8 @@ impl PeerConn { info: None, is_client: None, - close_event_sender: None, + + close_event_notifier: Arc::new(PeerConnCloseNotify::new(conn_id)), ctrl_resp_sender: ctrl_sender, @@ -267,10 +305,8 @@ impl PeerConn { let mut stream = self.recv.lock().await.take().unwrap(); let sink = self.sink.clone(); let sender = packet_recv_chan.clone(); - let close_event_sender = self.close_event_sender.clone().unwrap(); - let conn_id = self.conn_id; + let close_event_notifier = self.close_event_notifier.clone(); let ctrl_sender = self.ctrl_resp_sender.clone(); - let _conn_info = self.get_conn_info(); let conn_info_for_instrument = self.get_conn_info(); self.tasks.spawn( @@ -312,9 +348,7 @@ impl PeerConn { tracing::info!("end recving peer conn packet"); drop(sink); - if let Err(e) = close_event_sender.send(conn_id).await { - tracing::error!(error = ?e, "peer conn close event send error"); - } + close_event_notifier.notify_close(); task_ret } @@ -335,17 +369,14 @@ impl PeerConn { self.throughput.clone(), ); - let close_event_sender = self.close_event_sender.clone().unwrap(); - let conn_id = self.conn_id; + let close_event_notifier = self.close_event_notifier.clone(); self.tasks.spawn(async move { pingpong.pingpong().await; tracing::warn!(?pingpong, "pingpong task exit"); - if let Err(e) = close_event_sender.send(conn_id).await { - tracing::warn!("close event sender error: {:?}", e); - } + close_event_notifier.notify_close(); Ok(()) }); @@ -373,8 +404,8 @@ impl PeerConn { ret } - pub fn set_close_event_sender(&mut self, sender: mpsc::Sender) { - self.close_event_sender = Some(sender); + pub fn get_close_notifier(&self) -> Arc { + self.close_event_notifier.clone() } pub fn get_stats(&self) -> PeerConnStats { @@ -405,6 +436,13 @@ impl PeerConn { } } +impl Drop for PeerConn { + fn drop(&mut self) { + // if someone drop a conn manually, the notifier is not called. + self.close_event_notifier.notify_close(); + } +} + #[cfg(test)] mod tests { use std::sync::Arc; @@ -496,15 +534,13 @@ mod tests { s_peer.do_handshake_as_server() ); - s_peer.set_close_event_sender(tokio::sync::mpsc::channel(1).0); s_peer.start_recv_loop(create_packet_recv_chan().0).await; // do not start ping for s, s only reponde to ping from c assert!(c_ret.is_ok()); assert!(s_ret.is_ok()); - let (close_send, mut close_recv) = tokio::sync::mpsc::channel(1); - c_peer.set_close_event_sender(close_send); + let close_notifier = c_peer.get_close_notifier(); c_peer.start_pingpong(); c_peer.start_recv_loop(create_packet_recv_chan().0).await; @@ -520,9 +556,9 @@ mod tests { tokio::time::sleep(Duration::from_secs(15)).await; if conn_closed { - assert!(close_recv.try_recv().is_ok()); + assert!(close_notifier.is_closed()); } else { - assert!(close_recv.try_recv().is_err()); + assert!(!close_notifier.is_closed()); } } diff --git a/easytier/src/peers/peer_manager.rs b/easytier/src/peers/peer_manager.rs index 0bf906f..b06bec5 100644 --- a/easytier/src/peers/peer_manager.rs +++ b/easytier/src/peers/peer_manager.rs @@ -347,21 +347,43 @@ impl PeerManager { async fn start_peer_conn_close_event_handler(&self) { let dmap = self.directly_connected_conn_map.clone(); let mut event_recv = self.global_ctx.subscribe(); + let peer_map = self.peers.clone(); + use tokio::sync::broadcast::error::RecvError; self.tasks.lock().await.spawn(async move { - while let Ok(event) = event_recv.recv().await { - match event { - GlobalCtxEvent::PeerConnRemoved(info) => { - if let Some(set) = dmap.get_mut(&info.peer_id) { - let conn_id = info.conn_id.parse().unwrap(); - let old = set.remove(&conn_id); - tracing::info!( - ?old, - ?info, - "try remove conn id from directly connected map" - ); + loop { + match event_recv.recv().await { + Err(RecvError::Closed) => { + tracing::error!("peer conn close event handler exit"); + break; + } + Err(RecvError::Lagged(_)) => { + tracing::warn!("peer conn close event handler lagged"); + event_recv = event_recv.resubscribe(); + let alive_conns = peer_map.get_alive_conns(); + for p in dmap.iter_mut() { + p.retain(|x| alive_conns.contains_key(&(*p.key(), *x))); + } + dmap.retain(|_, v| !v.is_empty()); + } + Ok(event) => { + if let GlobalCtxEvent::PeerConnRemoved(info) = event { + let mut need_remove = false; + if let Some(set) = dmap.get_mut(&info.peer_id) { + let conn_id = info.conn_id.parse().unwrap(); + let old = set.remove(&conn_id); + tracing::info!( + ?old, + ?info, + "try remove conn id from directly connected map" + ); + need_remove = set.is_empty(); + } + + if need_remove { + dmap.remove(&info.peer_id); + } } } - _ => {} } } }); diff --git a/easytier/src/peers/peer_map.rs b/easytier/src/peers/peer_map.rs index b6ae05d..47e8e6c 100644 --- a/easytier/src/peers/peer_map.rs +++ b/easytier/src/peers/peer_map.rs @@ -27,6 +27,7 @@ pub struct PeerMap { peer_map: DashMap>, packet_send: PacketRecvChan, routes: RwLock>, + alive_conns: Arc>, } impl PeerMap { @@ -37,6 +38,7 @@ impl PeerMap { peer_map: DashMap::new(), packet_send, routes: RwLock::new(Vec::new()), + alive_conns: Arc::new(DashMap::new()), } } @@ -48,6 +50,7 @@ impl PeerMap { } pub async fn add_new_peer_conn(&self, peer_conn: PeerConn) { + self.maintain_alive_conns(&peer_conn); let peer_id = peer_conn.get_peer_id(); let no_entry = self.peer_map.get(&peer_id).is_none(); if no_entry { @@ -60,6 +63,30 @@ impl PeerMap { } } + fn maintain_alive_conns(&self, peer_conn: &PeerConn) { + let close_notifier = peer_conn.get_close_notifier(); + let alive_conns_weak = Arc::downgrade(&self.alive_conns); + let conn_id = close_notifier.get_conn_id(); + let conn_info = peer_conn.get_conn_info(); + self.alive_conns + .insert((conn_info.peer_id, conn_id.clone()), conn_info.clone()); + tokio::spawn(async move { + if let Some(mut waiter) = close_notifier.get_waiter().await { + let _ = waiter.recv().await; + } + let mut alive_conn_count = 0; + if let Some(alive_conns) = alive_conns_weak.upgrade() { + alive_conns.remove(&(conn_info.peer_id, conn_id)).unwrap(); + alive_conn_count = alive_conns.len(); + } + tracing::debug!( + ?conn_id, + "peer conn is closed, current alive conns: {}", + alive_conn_count + ); + }); + } + fn get_peer_by_id(&self, peer_id: PeerId) -> Option> { self.peer_map.get(&peer_id).map(|v| v.clone()) } @@ -284,6 +311,13 @@ impl PeerMap { Ok(!self.has_peer(gateway_id)) } + + pub fn get_alive_conns(&self) -> DashMap<(PeerId, PeerConnId), PeerConnInfo> { + self.alive_conns + .iter() + .map(|v| (v.key().clone(), v.value().clone())) + .collect() + } } impl Drop for PeerMap { diff --git a/easytier/src/tests/three_node.rs b/easytier/src/tests/three_node.rs index b13f837..2eae161 100644 --- a/easytier/src/tests/three_node.rs +++ b/easytier/src/tests/three_node.rs @@ -233,7 +233,7 @@ async fn subnet_proxy_test_udp() { let udp_connector = UdpTunnelConnector::new("udp://10.1.2.4:22233".parse().unwrap()); // NOTE: this should not excced udp tunnel max buffer size - let mut buf = vec![0; 20 * 1024]; + let mut buf = vec![0; 7 * 1024]; rand::thread_rng().fill(&mut buf[..]); _tunnel_pingpong_netns( @@ -266,7 +266,7 @@ async fn subnet_proxy_test_udp() { let udp_listener = UdpTunnelListener::new("udp://0.0.0.0:22234".parse().unwrap()); let udp_connector = UdpTunnelConnector::new("udp://10.144.144.3:22234".parse().unwrap()); // NOTE: this should not excced udp tunnel max buffer size - let mut buf = vec![0; 20 * 1024]; + let mut buf = vec![0; 7 * 1024]; rand::thread_rng().fill(&mut buf[..]); _tunnel_pingpong_netns( diff --git a/easytier/src/tunnel/common.rs b/easytier/src/tunnel/common.rs index 7d197a3..57b822f 100644 --- a/easytier/src/tunnel/common.rs +++ b/easytier/src/tunnel/common.rs @@ -159,7 +159,7 @@ where reserve_buf( &mut self_mut.buf, *self_mut.max_packet_size, - *self_mut.max_packet_size * 32, + *self_mut.max_packet_size * 2, ); let cap = self_mut.buf.capacity() - self_mut.buf.len(); diff --git a/easytier/src/tunnel/udp.rs b/easytier/src/tunnel/udp.rs index c7f360e..7634f6a 100644 --- a/easytier/src/tunnel/udp.rs +++ b/easytier/src/tunnel/udp.rs @@ -234,7 +234,7 @@ where { let mut buf = BytesMut::new(); loop { - reserve_buf(&mut buf, UDP_DATA_MTU, UDP_DATA_MTU * 16); + reserve_buf(&mut buf, UDP_DATA_MTU, UDP_DATA_MTU * 4); let (dg_size, addr) = match socket.recv_buf_from(&mut buf).await { Ok(v) => v, Err(e) => {