optimize memory issues (#767)
Some checks are pending
EasyTier Core / pre_job (push) Waiting to run
EasyTier Core / build_web (push) Blocked by required conditions
EasyTier Core / build (freebsd-13.2-x86_64, 13.2, ubuntu-22.04, x86_64-unknown-freebsd) (push) Blocked by required conditions
EasyTier Core / build (linux-aarch64, ubuntu-22.04, aarch64-unknown-linux-musl) (push) Blocked by required conditions
EasyTier Core / build (linux-arm, ubuntu-22.04, arm-unknown-linux-musleabi) (push) Blocked by required conditions
EasyTier Core / build (linux-armhf, ubuntu-22.04, arm-unknown-linux-musleabihf) (push) Blocked by required conditions
EasyTier Core / build (linux-armv7, ubuntu-22.04, armv7-unknown-linux-musleabi) (push) Blocked by required conditions
EasyTier Core / build (linux-armv7hf, ubuntu-22.04, armv7-unknown-linux-musleabihf) (push) Blocked by required conditions
EasyTier Core / build (linux-mips, ubuntu-22.04, mips-unknown-linux-musl) (push) Blocked by required conditions
EasyTier Core / build (linux-mipsel, ubuntu-22.04, mipsel-unknown-linux-musl) (push) Blocked by required conditions
EasyTier Core / build (linux-x86_64, ubuntu-22.04, x86_64-unknown-linux-musl) (push) Blocked by required conditions
EasyTier Core / build (macos-aarch64, macos-latest, aarch64-apple-darwin) (push) Blocked by required conditions
EasyTier Core / build (macos-x86_64, macos-latest, x86_64-apple-darwin) (push) Blocked by required conditions
EasyTier Core / build (windows-arm64, windows-latest, aarch64-pc-windows-msvc) (push) Blocked by required conditions
EasyTier Core / build (windows-x86_64, windows-latest, x86_64-pc-windows-msvc) (push) Blocked by required conditions
EasyTier Core / core-result (push) Blocked by required conditions
EasyTier GUI / pre_job (push) Waiting to run
EasyTier GUI / build-gui (linux-aarch64, aarch64-unknown-linux-gnu, ubuntu-22.04, aarch64-unknown-linux-musl) (push) Blocked by required conditions
EasyTier GUI / build-gui (linux-x86_64, x86_64-unknown-linux-gnu, ubuntu-22.04, x86_64-unknown-linux-musl) (push) Blocked by required conditions
EasyTier GUI / build-gui (macos-aarch64, aarch64-apple-darwin, macos-latest, aarch64-apple-darwin) (push) Blocked by required conditions
EasyTier GUI / build-gui (macos-x86_64, x86_64-apple-darwin, macos-latest, x86_64-apple-darwin) (push) Blocked by required conditions
EasyTier GUI / build-gui (windows-arm64, aarch64-pc-windows-msvc, windows-latest, aarch64-pc-windows-msvc) (push) Blocked by required conditions
EasyTier GUI / build-gui (windows-x86_64, x86_64-pc-windows-msvc, windows-latest, x86_64-pc-windows-msvc) (push) Blocked by required conditions
EasyTier GUI / gui-result (push) Blocked by required conditions
EasyTier Mobile / pre_job (push) Waiting to run
EasyTier Mobile / build-mobile (android, ubuntu-22.04, android) (push) Blocked by required conditions
EasyTier Mobile / mobile-result (push) Blocked by required conditions
EasyTier Test / pre_job (push) Waiting to run
EasyTier Test / test (push) Blocked by required conditions

* optimize memory issues

1. introduce jemalloc support, which can dump current memory usage
2. reduce the GlobalEvent broadcaster memory usage.
3. reduce tcp & udp tunnel memory usage

TODO: if peer conn tunnel hangs, the unbounded channel of peer rpc
may consume lots of memory, which should be improved.

* select a port from 15888+ when port is 0
This commit is contained in:
Sijie.Sun
2025-04-09 23:05:49 +08:00
committed by GitHub
parent 3c0d85c9db
commit 01e3ad99ca
16 changed files with 491 additions and 178 deletions

38
Cargo.lock generated
View File

@@ -240,9 +240,9 @@ checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50"
[[package]]
name = "async-broadcast"
version = "0.7.1"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20cd0e2e25ea8e5f7e9df04578dc6cf5c83577fd09b1a46aaf5c85e1c33f2a7e"
checksum = "435a87a52755b8f27fcf321ac4f04b2802e337c8c4872923137471ec39c37532"
dependencies = [
"event-listener",
"event-listener-strategy",
@@ -1925,6 +1925,9 @@ dependencies = [
"http",
"http_req",
"humansize",
"jemalloc-ctl",
"jemalloc-sys",
"jemallocator",
"kcp-sys",
"machine-uid",
"mimalloc-rust",
@@ -3715,6 +3718,37 @@ dependencies = [
"system-deps",
]
[[package]]
name = "jemalloc-ctl"
version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7cffc705424a344c054e135d12ee591402f4539245e8bbd64e6c9eaa9458b63c"
dependencies = [
"jemalloc-sys",
"libc",
"paste",
]
[[package]]
name = "jemalloc-sys"
version = "0.5.4+5.3.0-patched"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac6c1946e1cea1788cbfde01c993b52a10e2da07f4bac608228d1bed20bfebf2"
dependencies = [
"cc",
"libc",
]
[[package]]
name = "jemallocator"
version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0de374a9f8e63150e6f5e8a60cc14c668226d7a347d8aee1a45766e3c4dd3bc"
dependencies = [
"jemalloc-sys",
"libc",
]
[[package]]
name = "jni"
version = "0.19.0"

View File

@@ -1,6 +1,11 @@
[workspace]
resolver = "2"
members = ["easytier", "easytier-gui/src-tauri", "easytier-rpc-build", "easytier-web"]
members = [
"easytier",
"easytier-gui/src-tauri",
"easytier-rpc-build",
"easytier-web",
]
default-members = ["easytier", "easytier-web"]
[profile.dev]

View File

@@ -93,7 +93,7 @@ export function DEFAULT_NETWORK_CONFIG(): NetworkConfig {
'udp://0.0.0.0:11010',
'wg://0.0.0.0:11011',
],
rpc_port: 15888,
rpc_port: 0,
latency_first: false,
dev_name: '',

View File

@@ -205,6 +205,14 @@ hickory-resolver = "0.24.4"
bounded_join_set = "0.3.0"
jemallocator = { version = "0.5.4", optional = true }
jemalloc-ctl = { version = "0.5.4", optional = true }
jemalloc-sys = { version = "0.5.4", features = [
"stats",
"profiling",
"unprefixed_malloc_on_supported_platforms",
], optional = true }
[target.'cfg(any(target_os = "linux", target_os = "macos", target_os = "windows", target_os = "freebsd"))'.dependencies]
machine-uid = "0.5.3"
@@ -280,3 +288,4 @@ websocket = [
]
smoltcp = ["dep:smoltcp", "dep:parking_lot"]
socks5 = ["dep:smoltcp"]
jemalloc = ["dep:jemallocator", "dep:jemalloc-ctl", "dep:jemalloc-sys"]

View File

@@ -97,7 +97,7 @@ impl GlobalCtx {
let net_ns = NetNS::new(config_fs.get_netns());
let hostname = config_fs.get_hostname();
let (event_bus, _) = tokio::sync::broadcast::channel(1024);
let (event_bus, _) = tokio::sync::broadcast::channel(8);
let stun_info_collection = Arc::new(StunInfoCollector::new_with_default_servers());
@@ -141,10 +141,13 @@ impl GlobalCtx {
}
pub fn issue_event(&self, event: GlobalCtxEvent) {
if self.event_bus.receiver_count() != 0 {
self.event_bus.send(event).unwrap();
} else {
tracing::warn!("No subscriber for event: {:?}", event);
if let Err(e) = self.event_bus.send(event.clone()) {
tracing::warn!(
"Failed to send event: {:?}, error: {:?}, receiver count: {}",
event,
e,
self.event_bus.receiver_count()
);
}
}

View File

@@ -3,7 +3,10 @@ use std::{collections::BTreeSet, sync::Arc};
use anyhow::Context;
use dashmap::{DashMap, DashSet};
use tokio::{
sync::{broadcast::Receiver, mpsc, Mutex},
sync::{
broadcast::{error::RecvError, Receiver},
mpsc, Mutex,
},
task::JoinSet,
time::timeout,
};
@@ -179,8 +182,37 @@ impl ManualConnectorManager {
mut event_recv: Receiver<GlobalCtxEvent>,
) {
loop {
let event = event_recv.recv().await.expect("event_recv got error");
Self::handle_event(&event, &data).await;
match event_recv.recv().await {
Ok(event) => {
Self::handle_event(&event, &data).await;
}
Err(RecvError::Lagged(n)) => {
tracing::warn!("event_recv lagged: {}, rebuild alive conn list", n);
event_recv = event_recv.resubscribe();
data.alive_conn_urls.clear();
for x in data
.peer_manager
.get_peer_map()
.get_alive_conns()
.iter()
.map(|x| {
x.tunnel
.clone()
.unwrap_or_default()
.remote_addr
.unwrap_or_default()
.to_string()
})
{
data.alive_conn_urls.insert(x);
}
continue;
}
Err(RecvError::Closed) => {
tracing::warn!("event_recv closed, exit");
break;
}
}
}
}
@@ -271,7 +303,6 @@ impl ManualConnectorManager {
async fn collect_dead_conns(data: Arc<ConnectorManagerData>) -> BTreeSet<String> {
Self::handle_remove_connector(data.clone());
let all_urls: BTreeSet<String> = data
.connectors
.iter()

View File

@@ -6,12 +6,12 @@ extern crate rust_i18n;
use std::{
net::{Ipv4Addr, SocketAddr},
path::PathBuf,
process::ExitCode,
sync::Arc,
};
use anyhow::Context;
use clap::Parser;
use tokio::net::TcpSocket;
use easytier::{
common::{
@@ -38,13 +38,58 @@ use easytier::{
#[cfg(target_os = "windows")]
windows_service::define_windows_service!(ffi_service_main, win_service_main);
#[cfg(feature = "mimalloc")]
#[cfg(all(feature = "mimalloc", not(feature = "jemalloc")))]
use mimalloc_rust::GlobalMiMalloc;
#[cfg(feature = "mimalloc")]
#[cfg(all(feature = "mimalloc", not(feature = "jemalloc")))]
#[global_allocator]
static GLOBAL_MIMALLOC: GlobalMiMalloc = GlobalMiMalloc;
#[cfg(feature = "jemalloc")]
use jemalloc_ctl::{epoch, stats, Access as _, AsName as _};
#[cfg(feature = "jemalloc")]
#[global_allocator]
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;
fn set_prof_active(_active: bool) {
#[cfg(feature = "jemalloc")]
{
const PROF_ACTIVE: &'static [u8] = b"prof.active\0";
let name = PROF_ACTIVE.name();
name.write(_active).expect("Should succeed to set prof");
}
}
fn dump_profile(_cur_allocated: usize) {
#[cfg(feature = "jemalloc")]
{
const PROF_DUMP: &'static [u8] = b"prof.dump\0";
static mut PROF_DUMP_FILE_NAME: [u8; 128] = [0; 128];
let file_name_str = format!(
"profile-{}-{}.out",
_cur_allocated,
chrono::Local::now().format("%Y-%m-%d-%H-%M-%S")
);
// copy file name to PROF_DUMP
let file_name = file_name_str.as_bytes();
let len = file_name.len();
if len > 127 {
panic!("file name too long");
}
unsafe {
PROF_DUMP_FILE_NAME[..len].copy_from_slice(file_name);
// set the last byte to 0
PROF_DUMP_FILE_NAME[len] = 0;
let name = PROF_DUMP.name();
name.write(&PROF_DUMP_FILE_NAME[..len + 1])
.expect("Should succeed to dump profile");
println!("dump profile to: {}", file_name_str);
}
}
}
#[derive(Parser, Debug)]
#[command(name = "easytier-core", author, version = EASYTIER_VERSION , about, long_about = None)]
struct Cli {
@@ -116,7 +161,7 @@ struct Cli {
short,
long,
help = t!("core_clap.rpc_portal").to_string(),
default_value = "15888"
default_value = "0"
)]
rpc_portal: String,
@@ -391,22 +436,8 @@ impl Cli {
Ok(listeners)
}
fn check_tcp_available(port: u16) -> Option<SocketAddr> {
let s = format!("0.0.0.0:{}", port).parse::<SocketAddr>().unwrap();
TcpSocket::new_v4().unwrap().bind(s).map(|_| s).ok()
}
fn parse_rpc_portal(rpc_portal: String) -> anyhow::Result<SocketAddr> {
if let Ok(port) = rpc_portal.parse::<u16>() {
if port == 0 {
// check tcp 15888 first
for i in 15888..15900 {
if let Some(s) = Cli::check_tcp_available(i) {
return Ok(s);
}
}
return Ok("0.0.0.0:0".parse().unwrap());
}
return Ok(format!("0.0.0.0:{}", port).parse().unwrap());
}
@@ -652,114 +683,118 @@ fn peer_conn_info_to_string(p: proto::cli::PeerConnInfo) -> String {
#[tracing::instrument]
pub fn handle_event(mut events: EventBusSubscriber) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
while let Ok(e) = events.recv().await {
match e {
GlobalCtxEvent::PeerAdded(p) => {
print_event(format!("new peer added. peer_id: {}", p));
}
GlobalCtxEvent::PeerRemoved(p) => {
print_event(format!("peer removed. peer_id: {}", p));
}
GlobalCtxEvent::PeerConnAdded(p) => {
print_event(format!(
"new peer connection added. conn_info: {}",
peer_conn_info_to_string(p)
));
}
GlobalCtxEvent::PeerConnRemoved(p) => {
print_event(format!(
"peer connection removed. conn_info: {}",
peer_conn_info_to_string(p)
));
}
GlobalCtxEvent::ListenerAddFailed(p, msg) => {
print_event(format!(
"listener add failed. listener: {}, msg: {}",
p, msg
));
}
GlobalCtxEvent::ListenerAcceptFailed(p, msg) => {
print_event(format!(
"listener accept failed. listener: {}, msg: {}",
p, msg
));
}
GlobalCtxEvent::ListenerAdded(p) => {
if p.scheme() == "ring" {
continue;
loop {
if let Ok(e) = events.recv().await {
match e {
GlobalCtxEvent::PeerAdded(p) => {
print_event(format!("new peer added. peer_id: {}", p));
}
print_event(format!("new listener added. listener: {}", p));
}
GlobalCtxEvent::ConnectionAccepted(local, remote) => {
print_event(format!(
"new connection accepted. local: {}, remote: {}",
local, remote
));
}
GlobalCtxEvent::PeerRemoved(p) => {
print_event(format!("peer removed. peer_id: {}", p));
}
GlobalCtxEvent::ConnectionError(local, remote, err) => {
print_event(format!(
"connection error. local: {}, remote: {}, err: {}",
local, remote, err
));
}
GlobalCtxEvent::PeerConnAdded(p) => {
print_event(format!(
"new peer connection added. conn_info: {}",
peer_conn_info_to_string(p)
));
}
GlobalCtxEvent::TunDeviceReady(dev) => {
print_event(format!("tun device ready. dev: {}", dev));
}
GlobalCtxEvent::PeerConnRemoved(p) => {
print_event(format!(
"peer connection removed. conn_info: {}",
peer_conn_info_to_string(p)
));
}
GlobalCtxEvent::TunDeviceError(err) => {
print_event(format!("tun device error. err: {}", err));
}
GlobalCtxEvent::ListenerAddFailed(p, msg) => {
print_event(format!(
"listener add failed. listener: {}, msg: {}",
p, msg
));
}
GlobalCtxEvent::Connecting(dst) => {
print_event(format!("connecting to peer. dst: {}", dst));
}
GlobalCtxEvent::ListenerAcceptFailed(p, msg) => {
print_event(format!(
"listener accept failed. listener: {}, msg: {}",
p, msg
));
}
GlobalCtxEvent::ConnectError(dst, ip_version, err) => {
print_event(format!(
"connect to peer error. dst: {}, ip_version: {}, err: {}",
dst, ip_version, err
));
}
GlobalCtxEvent::ListenerAdded(p) => {
if p.scheme() == "ring" {
continue;
}
print_event(format!("new listener added. listener: {}", p));
}
GlobalCtxEvent::VpnPortalClientConnected(portal, client_addr) => {
print_event(format!(
"vpn portal client connected. portal: {}, client_addr: {}",
portal, client_addr
));
}
GlobalCtxEvent::ConnectionAccepted(local, remote) => {
print_event(format!(
"new connection accepted. local: {}, remote: {}",
local, remote
));
}
GlobalCtxEvent::VpnPortalClientDisconnected(portal, client_addr) => {
print_event(format!(
"vpn portal client disconnected. portal: {}, client_addr: {}",
portal, client_addr
));
}
GlobalCtxEvent::ConnectionError(local, remote, err) => {
print_event(format!(
"connection error. local: {}, remote: {}, err: {}",
local, remote, err
));
}
GlobalCtxEvent::DhcpIpv4Changed(old, new) => {
print_event(format!("dhcp ip changed. old: {:?}, new: {:?}", old, new));
}
GlobalCtxEvent::TunDeviceReady(dev) => {
print_event(format!("tun device ready. dev: {}", dev));
}
GlobalCtxEvent::DhcpIpv4Conflicted(ip) => {
print_event(format!("dhcp ip conflict. ip: {:?}", ip));
}
GlobalCtxEvent::TunDeviceError(err) => {
print_event(format!("tun device error. err: {}", err));
}
GlobalCtxEvent::PortForwardAdded(cfg) => {
print_event(format!(
"port forward added. local: {}, remote: {}, proto: {}",
cfg.bind_addr.unwrap().to_string(),
cfg.dst_addr.unwrap().to_string(),
cfg.socket_type().as_str_name()
));
GlobalCtxEvent::Connecting(dst) => {
print_event(format!("connecting to peer. dst: {}", dst));
}
GlobalCtxEvent::ConnectError(dst, ip_version, err) => {
print_event(format!(
"connect to peer error. dst: {}, ip_version: {}, err: {}",
dst, ip_version, err
));
}
GlobalCtxEvent::VpnPortalClientConnected(portal, client_addr) => {
print_event(format!(
"vpn portal client connected. portal: {}, client_addr: {}",
portal, client_addr
));
}
GlobalCtxEvent::VpnPortalClientDisconnected(portal, client_addr) => {
print_event(format!(
"vpn portal client disconnected. portal: {}, client_addr: {}",
portal, client_addr
));
}
GlobalCtxEvent::DhcpIpv4Changed(old, new) => {
print_event(format!("dhcp ip changed. old: {:?}, new: {:?}", old, new));
}
GlobalCtxEvent::DhcpIpv4Conflicted(ip) => {
print_event(format!("dhcp ip conflict. ip: {:?}", ip));
}
GlobalCtxEvent::PortForwardAdded(cfg) => {
print_event(format!(
"port forward added. local: {}, remote: {}, proto: {}",
cfg.bind_addr.unwrap().to_string(),
cfg.dst_addr.unwrap().to_string(),
cfg.socket_type().as_str_name()
));
}
}
} else {
events = events.resubscribe();
}
}
})
@@ -940,14 +975,61 @@ async fn run_main(cli: Cli) -> anyhow::Result<()> {
let mut l = launcher::NetworkInstance::new(cfg).set_fetch_node_info(false);
let _t = ScopedTask::from(handle_event(l.start().unwrap()));
if let Some(e) = l.wait().await {
anyhow::bail!("launcher error: {}", e);
tokio::select! {
e = l.wait() => {
if let Some(e) = e {
eprintln!("launcher error: {}", e);
}
}
_ = tokio::signal::ctrl_c() => {
println!("ctrl-c received, exiting...");
}
}
Ok(())
}
fn memory_monitor() {
#[cfg(feature = "jemalloc")]
{
let mut last_peak_size = 0;
let e = epoch::mib().unwrap();
let allocated_stats = stats::allocated::mib().unwrap();
loop {
e.advance().unwrap();
let new_heap_size = allocated_stats.read().unwrap();
println!(
"heap size: {} bytes, time: {}",
new_heap_size,
chrono::Local::now().format("%Y-%m-%d %H:%M:%S")
);
// dump every 75MB
if last_peak_size > 0
&& new_heap_size > last_peak_size
&& new_heap_size - last_peak_size > 75 * 1024 * 1024
{
println!(
"heap size increased: {} bytes, time: {}",
new_heap_size - last_peak_size,
chrono::Local::now().format("%Y-%m-%d %H:%M:%S")
);
dump_profile(new_heap_size);
last_peak_size = new_heap_size;
}
if last_peak_size == 0 {
last_peak_size = new_heap_size;
}
std::thread::sleep(std::time::Duration::from_secs(5));
}
}
}
#[tokio::main(flavor = "current_thread")]
async fn main() {
async fn main() -> ExitCode {
let locale = sys_locale::get_locale().unwrap_or_else(|| String::from("en-US"));
rust_i18n::set_locale(&locale);
setup_panic_handler();
@@ -968,10 +1050,21 @@ async fn main() {
}
};
set_prof_active(true);
let _monitor = std::thread::spawn(memory_monitor);
let cli = Cli::parse();
let mut ret_code = 0;
if let Err(e) = run_main(cli).await {
eprintln!("error: {:?}", e);
std::process::exit(1);
ret_code = 1;
}
println!("Stopping easytier...");
dump_profile(0);
set_prof_active(false);
ExitCode::from(ret_code)
}

View File

@@ -65,7 +65,7 @@ impl Stream for TunStream {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<StreamItem>> {
let mut self_mut = self.project();
let mut g = ready!(self_mut.l.poll_lock(cx));
reserve_buf(&mut self_mut.cur_buf, 2500, 32 * 1024);
reserve_buf(&mut self_mut.cur_buf, 2500, 4 * 1024);
if self_mut.cur_buf.len() == 0 {
unsafe {
self_mut.cur_buf.set_len(*self_mut.payload_offset);

View File

@@ -1,5 +1,6 @@
use std::{
collections::VecDeque,
net::SocketAddr,
sync::{atomic::AtomicBool, Arc, RwLock},
};
@@ -42,7 +43,7 @@ struct EasyTierData {
impl Default for EasyTierData {
fn default() -> Self {
let (tx, _) = broadcast::channel(100);
let (tx, _) = broadcast::channel(16);
Self {
event_subscriber: RwLock::new(tx),
events: RwLock::new(VecDeque::new()),
@@ -144,8 +145,19 @@ impl EasyTierLauncher {
let data_c = data.clone();
tasks.spawn(async move {
let mut receiver = global_ctx.subscribe();
while let Ok(event) = receiver.recv().await {
Self::handle_easytier_event(event, &data_c).await;
loop {
match receiver.recv().await {
Ok(event) => {
Self::handle_easytier_event(event.clone(), &data_c).await;
}
Err(broadcast::error::RecvError::Closed) => {
break;
}
Err(broadcast::error::RecvError::Lagged(_)) => {
// do nothing currently
receiver = receiver.resubscribe();
}
}
}
});
@@ -202,6 +214,27 @@ impl EasyTierLauncher {
Ok(())
}
fn check_tcp_available(port: u16) -> bool {
let s = format!("0.0.0.0:{}", port).parse::<SocketAddr>().unwrap();
std::net::TcpListener::bind(s).is_ok()
}
fn select_proper_rpc_port(cfg: &TomlConfigLoader) {
let Some(mut f) = cfg.get_rpc_portal() else {
return;
};
if f.port() == 0 {
for i in 15888..15900 {
if Self::check_tcp_available(i) {
f.set_port(i);
cfg.set_rpc_portal(f);
break;
}
}
}
}
pub fn start<F>(&mut self, cfg_generator: F)
where
F: FnOnce() -> Result<TomlConfigLoader, anyhow::Error> + Send + Sync,
@@ -217,6 +250,8 @@ impl EasyTierLauncher {
self.running_cfg = cfg.dump();
Self::select_proper_rpc_port(&cfg);
let stop_flag = self.stop_flag.clone();
let instance_alive = self.instance_alive.clone();
@@ -522,7 +557,8 @@ impl NetworkConfig {
let mut routes = Vec::<cidr::Ipv4Cidr>::with_capacity(self.routes.len());
for route in self.routes.iter() {
routes.push(
route.parse()
route
.parse()
.with_context(|| format!("failed to parse route: {}", route))?,
);
}
@@ -543,9 +579,7 @@ impl NetworkConfig {
if self.enable_socks5.unwrap_or_default() {
if let Some(socks5_port) = self.socks5_port {
cfg.set_socks5_portal(Some(
format!("socks5://0.0.0.0:{}", socks5_port)
.parse()
.unwrap(),
format!("socks5://0.0.0.0:{}", socks5_port).parse().unwrap(),
));
}
}

View File

@@ -118,9 +118,21 @@ impl Peer {
}
pub async fn add_peer_conn(&self, mut conn: PeerConn) {
conn.set_close_event_sender(self.close_event_sender.clone());
let close_event_sender = self.close_event_sender.clone();
let close_notifier = conn.get_close_notifier();
tokio::spawn(async move {
let conn_id = close_notifier.get_conn_id();
if let Some(mut waiter) = close_notifier.get_waiter().await {
let _ = waiter.recv().await;
}
if let Err(e) = close_event_sender.send(conn_id).await {
tracing::warn!(?conn_id, "failed to send close event: {}", e);
}
});
conn.start_recv_loop(self.packet_recv_chan.clone()).await;
conn.start_pingpong();
self.global_ctx
.issue_event(GlobalCtxEvent::PeerConnAdded(conn.get_conn_info()));
self.conns.insert(conn.get_conn_id(), Arc::new(conn));

View File

@@ -13,7 +13,7 @@ use futures::{StreamExt, TryFutureExt};
use prost::Message;
use tokio::{
sync::{broadcast, mpsc, Mutex},
sync::{broadcast, Mutex},
task::JoinSet,
time::{timeout, Duration},
};
@@ -50,6 +50,41 @@ pub type PeerConnId = uuid::Uuid;
const MAGIC: u32 = 0xd1e1a5e1;
const VERSION: u32 = 1;
pub struct PeerConnCloseNotify {
conn_id: PeerConnId,
sender: Arc<std::sync::Mutex<Option<broadcast::Sender<()>>>>,
}
impl PeerConnCloseNotify {
fn new(conn_id: PeerConnId) -> Self {
let (sender, _) = broadcast::channel(1);
Self {
conn_id,
sender: Arc::new(std::sync::Mutex::new(Some(sender))),
}
}
fn notify_close(&self) {
self.sender.lock().unwrap().take();
}
pub async fn get_waiter(&self) -> Option<broadcast::Receiver<()>> {
if let Some(sender) = self.sender.lock().unwrap().as_mut() {
let receiver = sender.subscribe();
return Some(receiver);
}
None
}
pub fn get_conn_id(&self) -> PeerConnId {
self.conn_id
}
pub fn is_closed(&self) -> bool {
self.sender.lock().unwrap().is_none()
}
}
pub struct PeerConn {
conn_id: PeerConnId,
@@ -66,7 +101,7 @@ pub struct PeerConn {
info: Option<HandshakeRequest>,
is_client: Option<bool>,
close_event_sender: Option<mpsc::Sender<PeerConnId>>,
close_event_notifier: Arc<PeerConnCloseNotify>,
ctrl_resp_sender: broadcast::Sender<ZCPacket>,
@@ -88,7 +123,7 @@ impl Debug for PeerConn {
impl PeerConn {
pub fn new(my_peer_id: PeerId, global_ctx: ArcGlobalCtx, tunnel: Box<dyn Tunnel>) -> Self {
let tunnel_info = tunnel.info();
let (ctrl_sender, _ctrl_receiver) = broadcast::channel(100);
let (ctrl_sender, _ctrl_receiver) = broadcast::channel(8);
let peer_conn_tunnel_filter = StatsRecorderTunnelFilter::new();
let throughput = peer_conn_tunnel_filter.filter_output();
@@ -97,8 +132,10 @@ impl PeerConn {
let (recv, sink) = (mpsc_tunnel.get_stream(), mpsc_tunnel.get_sink());
let conn_id = PeerConnId::new_v4();
PeerConn {
conn_id: PeerConnId::new_v4(),
conn_id: conn_id.clone(),
my_peer_id,
global_ctx,
@@ -114,7 +151,8 @@ impl PeerConn {
info: None,
is_client: None,
close_event_sender: None,
close_event_notifier: Arc::new(PeerConnCloseNotify::new(conn_id)),
ctrl_resp_sender: ctrl_sender,
@@ -267,10 +305,8 @@ impl PeerConn {
let mut stream = self.recv.lock().await.take().unwrap();
let sink = self.sink.clone();
let sender = packet_recv_chan.clone();
let close_event_sender = self.close_event_sender.clone().unwrap();
let conn_id = self.conn_id;
let close_event_notifier = self.close_event_notifier.clone();
let ctrl_sender = self.ctrl_resp_sender.clone();
let _conn_info = self.get_conn_info();
let conn_info_for_instrument = self.get_conn_info();
self.tasks.spawn(
@@ -312,9 +348,7 @@ impl PeerConn {
tracing::info!("end recving peer conn packet");
drop(sink);
if let Err(e) = close_event_sender.send(conn_id).await {
tracing::error!(error = ?e, "peer conn close event send error");
}
close_event_notifier.notify_close();
task_ret
}
@@ -335,17 +369,14 @@ impl PeerConn {
self.throughput.clone(),
);
let close_event_sender = self.close_event_sender.clone().unwrap();
let conn_id = self.conn_id;
let close_event_notifier = self.close_event_notifier.clone();
self.tasks.spawn(async move {
pingpong.pingpong().await;
tracing::warn!(?pingpong, "pingpong task exit");
if let Err(e) = close_event_sender.send(conn_id).await {
tracing::warn!("close event sender error: {:?}", e);
}
close_event_notifier.notify_close();
Ok(())
});
@@ -373,8 +404,8 @@ impl PeerConn {
ret
}
pub fn set_close_event_sender(&mut self, sender: mpsc::Sender<PeerConnId>) {
self.close_event_sender = Some(sender);
pub fn get_close_notifier(&self) -> Arc<PeerConnCloseNotify> {
self.close_event_notifier.clone()
}
pub fn get_stats(&self) -> PeerConnStats {
@@ -405,6 +436,13 @@ impl PeerConn {
}
}
impl Drop for PeerConn {
fn drop(&mut self) {
// if someone drop a conn manually, the notifier is not called.
self.close_event_notifier.notify_close();
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
@@ -496,15 +534,13 @@ mod tests {
s_peer.do_handshake_as_server()
);
s_peer.set_close_event_sender(tokio::sync::mpsc::channel(1).0);
s_peer.start_recv_loop(create_packet_recv_chan().0).await;
// do not start ping for s, s only reponde to ping from c
assert!(c_ret.is_ok());
assert!(s_ret.is_ok());
let (close_send, mut close_recv) = tokio::sync::mpsc::channel(1);
c_peer.set_close_event_sender(close_send);
let close_notifier = c_peer.get_close_notifier();
c_peer.start_pingpong();
c_peer.start_recv_loop(create_packet_recv_chan().0).await;
@@ -520,9 +556,9 @@ mod tests {
tokio::time::sleep(Duration::from_secs(15)).await;
if conn_closed {
assert!(close_recv.try_recv().is_ok());
assert!(close_notifier.is_closed());
} else {
assert!(close_recv.try_recv().is_err());
assert!(!close_notifier.is_closed());
}
}

View File

@@ -347,21 +347,43 @@ impl PeerManager {
async fn start_peer_conn_close_event_handler(&self) {
let dmap = self.directly_connected_conn_map.clone();
let mut event_recv = self.global_ctx.subscribe();
let peer_map = self.peers.clone();
use tokio::sync::broadcast::error::RecvError;
self.tasks.lock().await.spawn(async move {
while let Ok(event) = event_recv.recv().await {
match event {
GlobalCtxEvent::PeerConnRemoved(info) => {
if let Some(set) = dmap.get_mut(&info.peer_id) {
let conn_id = info.conn_id.parse().unwrap();
let old = set.remove(&conn_id);
tracing::info!(
?old,
?info,
"try remove conn id from directly connected map"
);
loop {
match event_recv.recv().await {
Err(RecvError::Closed) => {
tracing::error!("peer conn close event handler exit");
break;
}
Err(RecvError::Lagged(_)) => {
tracing::warn!("peer conn close event handler lagged");
event_recv = event_recv.resubscribe();
let alive_conns = peer_map.get_alive_conns();
for p in dmap.iter_mut() {
p.retain(|x| alive_conns.contains_key(&(*p.key(), *x)));
}
dmap.retain(|_, v| !v.is_empty());
}
Ok(event) => {
if let GlobalCtxEvent::PeerConnRemoved(info) = event {
let mut need_remove = false;
if let Some(set) = dmap.get_mut(&info.peer_id) {
let conn_id = info.conn_id.parse().unwrap();
let old = set.remove(&conn_id);
tracing::info!(
?old,
?info,
"try remove conn id from directly connected map"
);
need_remove = set.is_empty();
}
if need_remove {
dmap.remove(&info.peer_id);
}
}
}
_ => {}
}
}
});

View File

@@ -27,6 +27,7 @@ pub struct PeerMap {
peer_map: DashMap<PeerId, Arc<Peer>>,
packet_send: PacketRecvChan,
routes: RwLock<Vec<ArcRoute>>,
alive_conns: Arc<DashMap<(PeerId, PeerConnId), PeerConnInfo>>,
}
impl PeerMap {
@@ -37,6 +38,7 @@ impl PeerMap {
peer_map: DashMap::new(),
packet_send,
routes: RwLock::new(Vec::new()),
alive_conns: Arc::new(DashMap::new()),
}
}
@@ -48,6 +50,7 @@ impl PeerMap {
}
pub async fn add_new_peer_conn(&self, peer_conn: PeerConn) {
self.maintain_alive_conns(&peer_conn);
let peer_id = peer_conn.get_peer_id();
let no_entry = self.peer_map.get(&peer_id).is_none();
if no_entry {
@@ -60,6 +63,30 @@ impl PeerMap {
}
}
fn maintain_alive_conns(&self, peer_conn: &PeerConn) {
let close_notifier = peer_conn.get_close_notifier();
let alive_conns_weak = Arc::downgrade(&self.alive_conns);
let conn_id = close_notifier.get_conn_id();
let conn_info = peer_conn.get_conn_info();
self.alive_conns
.insert((conn_info.peer_id, conn_id.clone()), conn_info.clone());
tokio::spawn(async move {
if let Some(mut waiter) = close_notifier.get_waiter().await {
let _ = waiter.recv().await;
}
let mut alive_conn_count = 0;
if let Some(alive_conns) = alive_conns_weak.upgrade() {
alive_conns.remove(&(conn_info.peer_id, conn_id)).unwrap();
alive_conn_count = alive_conns.len();
}
tracing::debug!(
?conn_id,
"peer conn is closed, current alive conns: {}",
alive_conn_count
);
});
}
fn get_peer_by_id(&self, peer_id: PeerId) -> Option<Arc<Peer>> {
self.peer_map.get(&peer_id).map(|v| v.clone())
}
@@ -284,6 +311,13 @@ impl PeerMap {
Ok(!self.has_peer(gateway_id))
}
pub fn get_alive_conns(&self) -> DashMap<(PeerId, PeerConnId), PeerConnInfo> {
self.alive_conns
.iter()
.map(|v| (v.key().clone(), v.value().clone()))
.collect()
}
}
impl Drop for PeerMap {

View File

@@ -233,7 +233,7 @@ async fn subnet_proxy_test_udp() {
let udp_connector = UdpTunnelConnector::new("udp://10.1.2.4:22233".parse().unwrap());
// NOTE: this should not excced udp tunnel max buffer size
let mut buf = vec![0; 20 * 1024];
let mut buf = vec![0; 7 * 1024];
rand::thread_rng().fill(&mut buf[..]);
_tunnel_pingpong_netns(
@@ -266,7 +266,7 @@ async fn subnet_proxy_test_udp() {
let udp_listener = UdpTunnelListener::new("udp://0.0.0.0:22234".parse().unwrap());
let udp_connector = UdpTunnelConnector::new("udp://10.144.144.3:22234".parse().unwrap());
// NOTE: this should not excced udp tunnel max buffer size
let mut buf = vec![0; 20 * 1024];
let mut buf = vec![0; 7 * 1024];
rand::thread_rng().fill(&mut buf[..]);
_tunnel_pingpong_netns(

View File

@@ -159,7 +159,7 @@ where
reserve_buf(
&mut self_mut.buf,
*self_mut.max_packet_size,
*self_mut.max_packet_size * 32,
*self_mut.max_packet_size * 2,
);
let cap = self_mut.buf.capacity() - self_mut.buf.len();

View File

@@ -234,7 +234,7 @@ where
{
let mut buf = BytesMut::new();
loop {
reserve_buf(&mut buf, UDP_DATA_MTU, UDP_DATA_MTU * 16);
reserve_buf(&mut buf, UDP_DATA_MTU, UDP_DATA_MTU * 4);
let (dg_size, addr) = match socket.recv_buf_from(&mut buf).await {
Ok(v) => v,
Err(e) => {