diff --git a/.github/workflows/gui.yml b/.github/workflows/gui.yml index eb30f81..ffa6be9 100644 --- a/.github/workflows/gui.yml +++ b/.github/workflows/gui.yml @@ -152,6 +152,13 @@ jobs: sudo dpkg --add-architecture arm64 sudo apt-get update + sudo apt-get install -y libgstreamer1.0-0:arm64 + sudo apt-get install -y gstreamer1.0-plugins-base:arm64 + sudo apt-get install -y gstreamer1.0-plugins-good:arm64 + sudo apt-get install -y libgstreamer-gl1.0-0:arm64 + sudo apt-get install -y libgstreamer-plugins-base1.0-0:arm64 + sudo apt-get install -y libgstreamer-plugins-good1.0-0:arm64 + sudo apt-get install -y libwebkit2gtk-4.1-0:arm64 sudo apt install -f -o Dpkg::Options::="--force-overwrite" libwebkit2gtk-4.1-dev:arm64 libssl-dev:arm64 gcc-aarch64-linux-gnu echo "PKG_CONFIG_SYSROOT_DIR=/usr/aarch64-linux-gnu/" >> "$GITHUB_ENV" echo "PKG_CONFIG_PATH=/usr/lib/aarch64-linux-gnu/pkgconfig/" >> "$GITHUB_ENV" diff --git a/easytier/src/connector/udp_hole_punch/common.rs b/easytier/src/connector/udp_hole_punch/common.rs index 35162e7..2c6637b 100644 --- a/easytier/src/connector/udp_hole_punch/common.rs +++ b/easytier/src/connector/udp_hole_punch/common.rs @@ -26,7 +26,7 @@ use crate::{ }, }; -pub(crate) const HOLE_PUNCH_PACKET_BODY_LEN: u16 = 16; +pub(crate) const HOLE_PUNCH_PACKET_BODY_LEN: u16 = 32; fn generate_shuffled_port_vec() -> Vec { let mut rng = rand::thread_rng(); @@ -285,9 +285,16 @@ impl UdpSocketArray { pub async fn send_with_all(&self, data: &[u8], addr: SocketAddr) -> Result<(), anyhow::Error> { tracing::info!(?addr, "sending hole punching packet"); - for socket in self.sockets.iter() { - let socket = socket.value(); - socket.send_to(data, addr).await?; + let sockets = self + .sockets + .iter() + .map(|s| s.value().clone()) + .collect::>(); + + for socket in sockets.iter() { + for _ in 0..3 { + socket.send_to(data, addr).await?; + } } Ok(()) @@ -558,12 +565,14 @@ pub(crate) async fn send_symmetric_hole_punch_packet( let port = ports[cur_port_idx % ports.len()]; for pub_ip in public_ips { let addr = SocketAddr::V4(SocketAddrV4::new(*pub_ip, port)); - let packet = new_hole_punch_packet(transaction_id, HOLE_PUNCH_PACKET_BODY_LEN); - udp.send_to(&packet.into_bytes(), addr).await?; + for _ in 0..3 { + let packet = new_hole_punch_packet(transaction_id, HOLE_PUNCH_PACKET_BODY_LEN); + udp.send_to(&packet.into_bytes(), addr).await?; + } sent_packets += 1; } cur_port_idx = cur_port_idx.wrapping_add(1); - tokio::time::sleep(Duration::from_millis(3)).await; + tokio::time::sleep(Duration::from_millis(1)).await; } Ok(cur_port_idx % ports.len()) } diff --git a/easytier/src/connector/udp_hole_punch/sym_to_cone.rs b/easytier/src/connector/udp_hole_punch/sym_to_cone.rs index 3a563b1..dbba9e4 100644 --- a/easytier/src/connector/udp_hole_punch/sym_to_cone.rs +++ b/easytier/src/connector/udp_hole_punch/sym_to_cone.rs @@ -23,7 +23,7 @@ use crate::{ proto::{ peer_rpc::{ SelectPunchListenerRequest, SendPunchPacketEasySymRequest, - SendPunchPacketHardSymRequest, SendPunchPacketHardSymResponse, + SendPunchPacketHardSymRequest, SendPunchPacketHardSymResponse, UdpHolePunchRpc, UdpHolePunchRpcClientFactory, }, rpc_types::{self, controller::BaseController}, @@ -113,16 +113,19 @@ impl PunchSymToConeHoleServer { ?public_ips, "send_punch_packet_easy_sym send to ports" ); - send_symmetric_hole_punch_packet( - &ports, - listener, - transaction_id, - &public_ips, - 0, - ports.len(), - ) - .await - .with_context(|| "failed to send symmetric hole punch packet")?; + + for _ in 0..2 { + send_symmetric_hole_punch_packet( + &ports, + listener.clone(), + transaction_id, + &public_ips, + 0, + ports.len(), + ) + .await + .with_context(|| "failed to send symmetric hole punch packet")?; + } Ok(()) } @@ -170,16 +173,19 @@ impl PunchSymToConeHoleServer { max_k2 = max_k2.mul(2).div(round).max(max_k1); } - let next_port_index = send_symmetric_hole_punch_packet( - &self.shuffled_port_vec, - listener.clone(), - transaction_id, - &public_ips, - last_port_index, - max_k2 as usize, - ) - .await - .with_context(|| "failed to send symmetric hole punch packet randomly")?; + let mut next_port_index = 0; + for _ in 0..2 { + next_port_index = send_symmetric_hole_punch_packet( + &self.shuffled_port_vec, + listener.clone(), + transaction_id, + &public_ips, + last_port_index, + max_k2 as usize, + ) + .await + .with_context(|| "failed to send symmetric hole punch packet randomly")?; + } return Ok(SendPunchPacketHardSymResponse { next_port_index: next_port_index as u32, @@ -251,6 +257,133 @@ impl PunchSymToConeHoleClient { } } + async fn remote_send_hole_punch_packet_predicable< + S: UdpHolePunchRpc, + >( + rpc_stub: S, + base_port_for_easy_sym: Option, + my_nat_info: UdpNatType, + remote_mapped_addr: crate::proto::common::SocketAddr, + public_ips: Vec, + tid: u32, + ) { + let Some(inc) = my_nat_info.get_inc_of_easy_sym() else { + return; + }; + let req = SendPunchPacketEasySymRequest { + listener_mapped_addr: remote_mapped_addr.clone().into(), + public_ips: public_ips.clone().into_iter().map(|x| x.into()).collect(), + transaction_id: tid, + base_port_num: base_port_for_easy_sym.unwrap() as u32, + max_port_num: 50, + is_incremental: inc, + }; + tracing::debug!(?req, "send punch packet for easy sym start"); + let ret = rpc_stub + .send_punch_packet_easy_sym( + BaseController { + timeout_ms: 4000, + trace_id: 0, + }, + req, + ) + .await; + tracing::debug!(?ret, "send punch packet for easy sym return"); + } + + async fn remote_send_hole_punch_packet_random< + S: UdpHolePunchRpc, + >( + rpc_stub: S, + remote_mapped_addr: crate::proto::common::SocketAddr, + public_ips: Vec, + tid: u32, + round: u32, + port_index: u32, + ) -> Option { + let req = SendPunchPacketHardSymRequest { + listener_mapped_addr: remote_mapped_addr.clone().into(), + public_ips: public_ips.clone().into_iter().map(|x| x.into()).collect(), + transaction_id: tid, + round, + port_index, + }; + tracing::debug!(?req, "send punch packet for hard sym start"); + match rpc_stub + .send_punch_packet_hard_sym( + BaseController { + timeout_ms: 4000, + trace_id: 0, + }, + req, + ) + .await + { + Err(e) => { + tracing::error!(?e, "failed to send punch packet for hard sym"); + return None; + } + Ok(resp) => return Some(resp.next_port_index), + } + } + + async fn get_rpc_stub( + &self, + dst_peer_id: PeerId, + ) -> Box<(dyn UdpHolePunchRpc + std::marker::Send + 'static)> { + self.peer_mgr + .get_peer_rpc_mgr() + .rpc_client() + .scoped_client::>( + self.peer_mgr.my_peer_id(), + dst_peer_id, + self.peer_mgr.get_global_ctx().get_network_name(), + ) + } + + async fn check_hole_punch_result( + udp_array: &Arc, + packet: &[u8], + tid: u32, + remote_mapped_addr: crate::proto::common::SocketAddr, + scoped_punch_task: &ScopedTask, + ) -> Result>, anyhow::Error> { + // no matter what the result is, we should check if we received any hole punching packet + let mut ret_tunnel: Option> = None; + let mut finish_time: Option = None; + while finish_time.is_none() || finish_time.as_ref().unwrap().elapsed().as_millis() < 1000 { + udp_array + .send_with_all(&packet, remote_mapped_addr.into()) + .await?; + + tokio::time::sleep(Duration::from_millis(200)).await; + + if finish_time.is_none() && (*scoped_punch_task).is_finished() { + finish_time = Some(Instant::now()); + } + + let Some(socket) = udp_array.try_fetch_punched_socket(tid) else { + tracing::debug!("no punched socket found, wait for more time"); + continue; + }; + + // if hole punched but tunnel creation failed, need to retry entire process. + match try_connect_with_socket(socket.socket.clone(), remote_mapped_addr.into()).await { + Ok(tunnel) => { + ret_tunnel.replace(tunnel); + break; + } + Err(e) => { + tracing::error!(?e, "failed to connect with socket"); + udp_array.add_new_socket(socket.socket).await?; + continue; + } + } + } + + Ok(ret_tunnel) + } + #[tracing::instrument(err(level = Level::ERROR), skip(self))] pub(crate) async fn do_hole_punching( &self, @@ -309,99 +442,61 @@ impl PunchSymToConeHoleClient { let packet = new_hole_punch_packet(tid, HOLE_PUNCH_PACKET_BODY_LEN).into_bytes(); udp_array.add_intreast_tid(tid); defer! { udp_array.remove_intreast_tid(tid);} + + let port_index = *last_port_idx as u32; + let base_port_for_easy_sym = self.get_base_port_for_easy_sym(my_nat_info).await; udp_array .send_with_all(&packet, remote_mapped_addr.into()) .await?; - let port_index = *last_port_idx as u32; - let base_port_for_easy_sym = self.get_base_port_for_easy_sym(my_nat_info).await; - let punch_random = self.punch_randomly.load(Ordering::Relaxed); - let punch_predicable = self.punch_predicablely.load(Ordering::Relaxed); - let scoped_punch_task: ScopedTask> = tokio::spawn(async move { - if punch_predicable && base_port_for_easy_sym.is_some() { - if let Some(inc) = my_nat_info.get_inc_of_easy_sym() { - let req = SendPunchPacketEasySymRequest { - listener_mapped_addr: remote_mapped_addr.clone().into(), - public_ips: public_ips.clone().into_iter().map(|x| x.into()).collect(), - transaction_id: tid, - base_port_num: base_port_for_easy_sym.unwrap() as u32, - max_port_num: 50, - is_incremental: inc, - }; - tracing::debug!(?req, "send punch packet for easy sym start"); - let ret = rpc_stub - .send_punch_packet_easy_sym( - BaseController { - timeout_ms: 4000, - trace_id: 0, - }, - req, - ) - .await; - tracing::debug!(?ret, "send punch packet for easy sym return"); - } - } + if self.punch_predicablely.load(Ordering::Relaxed) && base_port_for_easy_sym.is_some() { + let rpc_stub = self.get_rpc_stub(dst_peer_id).await; + let scoped_punch_task: ScopedTask<()> = + tokio::spawn(Self::remote_send_hole_punch_packet_predicable( + rpc_stub, + base_port_for_easy_sym, + my_nat_info, + remote_mapped_addr.clone(), + public_ips.clone(), + tid, + )) + .into(); + let ret_tunnel = Self::check_hole_punch_result( + &udp_array, + &packet, + tid, + remote_mapped_addr.clone(), + &scoped_punch_task, + ) + .await?; - if punch_random { - let req = SendPunchPacketHardSymRequest { - listener_mapped_addr: remote_mapped_addr.clone().into(), - public_ips: public_ips.clone().into_iter().map(|x| x.into()).collect(), - transaction_id: tid, - round, - port_index, - }; - tracing::debug!(?req, "send punch packet for hard sym start"); - match rpc_stub - .send_punch_packet_hard_sym( - BaseController { - timeout_ms: 4000, - trace_id: 0, - }, - req, - ) - .await - { - Err(e) => { - tracing::error!(?e, "failed to send punch packet for hard sym"); - return None; - } - Ok(resp) => return Some(resp.next_port_index), - } - } - - None - }) - .into(); - - // no matter what the result is, we should check if we received any hole punching packet - let mut ret_tunnel: Option> = None; - let mut finish_time: Option = None; - while finish_time.is_none() || finish_time.as_ref().unwrap().elapsed().as_millis() < 1000 { - tokio::time::sleep(Duration::from_millis(200)).await; - - if finish_time.is_none() && (*scoped_punch_task).is_finished() { - finish_time = Some(Instant::now()); - } - - let Some(socket) = udp_array.try_fetch_punched_socket(tid) else { - tracing::debug!("no punched socket found, wait for more time"); - continue; - }; - - // if hole punched but tunnel creation failed, need to retry entire process. - match try_connect_with_socket(socket.socket.clone(), remote_mapped_addr.into()).await { - Ok(tunnel) => { - ret_tunnel.replace(tunnel); - break; - } - Err(e) => { - tracing::error!(?e, "failed to connect with socket"); - udp_array.add_new_socket(socket.socket).await?; - continue; - } + let task_ret = scoped_punch_task.await; + tracing::debug!(?ret_tunnel, ?task_ret, "predictable punch task got result"); + if let Some(tunnel) = ret_tunnel { + return Ok(Some(tunnel)); } } + let rpc_stub = self.get_rpc_stub(dst_peer_id).await; + let scoped_punch_task: ScopedTask> = + tokio::spawn(Self::remote_send_hole_punch_packet_random( + rpc_stub, + remote_mapped_addr.clone(), + public_ips.clone(), + tid, + round, + port_index, + )) + .into(); + let ret_tunnel = Self::check_hole_punch_result( + &udp_array, + &packet, + tid, + remote_mapped_addr.clone(), + &scoped_punch_task, + ) + .await?; + let punch_task_result = scoped_punch_task.await; tracing::debug!(?punch_task_result, ?ret_tunnel, "punch task got result");