use std::{ collections::BTreeSet, io, net::Ipv4Addr, pin::Pin, sync::{Arc, Weak}, task::{Context, Poll}, }; use crate::{ common::{ error::Error, global_ctx::{ArcGlobalCtx, GlobalCtxEvent}, ifcfg::{IfConfiger, IfConfiguerTrait}, }, peers::{peer_manager::PeerManager, recv_packet_from_chan, PacketRecvChanReceiver}, tunnel::{ common::{reserve_buf, FramedWriter, TunnelWrapper, ZCPacketToBytes}, packet_def::{ZCPacket, ZCPacketType, TAIL_RESERVED_SIZE}, StreamItem, Tunnel, TunnelError, ZCPacketSink, ZCPacketStream, }, }; use byteorder::WriteBytesExt as _; use bytes::{BufMut, BytesMut}; use futures::{lock::BiLock, ready, SinkExt, Stream, StreamExt}; use pin_project_lite::pin_project; use pnet::packet::ipv4::Ipv4Packet; use tokio::{ io::{AsyncRead, AsyncWrite, ReadBuf}, sync::Mutex, task::JoinSet, }; use tokio_util::bytes::Bytes; use tun::{AbstractDevice, AsyncDevice, Configuration, Layer}; use zerocopy::{NativeEndian, NetworkEndian}; #[cfg(target_os = "windows")] use crate::common::ifcfg::RegistryManager; pin_project! { pub struct TunStream { #[pin] l: BiLock, cur_buf: BytesMut, has_packet_info: bool, payload_offset: usize, } } impl TunStream { pub fn new(l: BiLock, has_packet_info: bool) -> Self { let mut payload_offset = ZCPacketType::NIC.get_packet_offsets().payload_offset; if has_packet_info { payload_offset -= 4; } Self { l, cur_buf: BytesMut::new(), has_packet_info, payload_offset, } } } impl Stream for TunStream { type Item = StreamItem; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut self_mut = self.project(); let mut g = ready!(self_mut.l.poll_lock(cx)); reserve_buf(&mut self_mut.cur_buf, 2500, 4 * 1024); if self_mut.cur_buf.len() == 0 { unsafe { self_mut.cur_buf.set_len(*self_mut.payload_offset); } } let buf = self_mut.cur_buf.chunk_mut().as_mut_ptr(); let buf = unsafe { std::slice::from_raw_parts_mut(buf, 2500) }; let mut buf = ReadBuf::new(buf); let ret = ready!(g.as_pin_mut().poll_read(cx, &mut buf)); let len = buf.filled().len(); if len == 0 { return Poll::Ready(None); } unsafe { self_mut.cur_buf.advance_mut(len + TAIL_RESERVED_SIZE) }; let mut ret_buf = self_mut.cur_buf.split(); let cur_len = ret_buf.len(); ret_buf.truncate(cur_len - TAIL_RESERVED_SIZE); match ret { Ok(_) => Poll::Ready(Some(Ok(ZCPacket::new_from_buf(ret_buf, ZCPacketType::NIC)))), Err(err) => { println!("tun stream error: {:?}", err); Poll::Ready(None) } } } } #[derive(Debug, Clone, Copy, Default)] enum PacketProtocol { #[default] IPv4, IPv6, Other(u8), } // Note: the protocol in the packet information header is platform dependent. impl PacketProtocol { #[cfg(any(target_os = "linux", target_os = "android"))] fn into_pi_field(self) -> Result { use nix::libc; match self { PacketProtocol::IPv4 => Ok(libc::ETH_P_IP as u16), PacketProtocol::IPv6 => Ok(libc::ETH_P_IPV6 as u16), PacketProtocol::Other(_) => Err(io::Error::new( io::ErrorKind::Other, "neither an IPv4 nor IPv6 packet", )), } } #[cfg(any(target_os = "macos", target_os = "ios", target_os = "freebsd"))] fn into_pi_field(self) -> Result { use nix::libc; match self { PacketProtocol::IPv4 => Ok(libc::PF_INET as u16), PacketProtocol::IPv6 => Ok(libc::PF_INET6 as u16), PacketProtocol::Other(_) => Err(io::Error::new( io::ErrorKind::Other, "neither an IPv4 nor IPv6 packet", )), } } #[cfg(target_os = "windows")] fn into_pi_field(self) -> Result { unimplemented!() } } /// Infer the protocol based on the first nibble in the packet buffer. fn infer_proto(buf: &[u8]) -> PacketProtocol { match buf[0] >> 4 { 4 => PacketProtocol::IPv4, 6 => PacketProtocol::IPv6, p => PacketProtocol::Other(p), } } struct TunZCPacketToBytes { has_packet_info: bool, } impl TunZCPacketToBytes { pub fn new(has_packet_info: bool) -> Self { Self { has_packet_info } } pub fn fill_packet_info( &self, mut buf: &mut [u8], proto: PacketProtocol, ) -> Result<(), io::Error> { // flags is always 0 buf.write_u16::(0)?; // write the protocol as network byte order buf.write_u16::(proto.into_pi_field()?)?; Ok(()) } } impl ZCPacketToBytes for TunZCPacketToBytes { fn into_bytes(&self, zc_packet: ZCPacket) -> Result { let payload_offset = zc_packet.payload_offset(); let mut inner = zc_packet.inner(); // we have peer manager header, so payload offset must larger than 4 assert!(payload_offset >= 4); let ret = if self.has_packet_info { let mut inner = inner.split_off(payload_offset - 4); let proto = infer_proto(&inner[4..]); self.fill_packet_info(&mut inner[0..4], proto)?; inner } else { inner.split_off(payload_offset) }; tracing::debug!(?ret, ?payload_offset, "convert zc packet to tun packet"); Ok(ret.into()) } } pin_project! { pub struct TunAsyncWrite { #[pin] l: BiLock, } } impl AsyncWrite for TunAsyncWrite { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { let self_mut = self.project(); let mut g = ready!(self_mut.l.poll_lock(cx)); g.as_pin_mut().poll_write(cx, buf) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let self_mut = self.project(); let mut g = ready!(self_mut.l.poll_lock(cx)); g.as_pin_mut().poll_flush(cx) } fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let self_mut = self.project(); let mut g = ready!(self_mut.l.poll_lock(cx)); g.as_pin_mut().poll_shutdown(cx) } fn poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll> { let self_mut = self.project(); let mut g = ready!(self_mut.l.poll_lock(cx)); g.as_pin_mut().poll_write_vectored(cx, bufs) } fn is_write_vectored(&self) -> bool { true } } pub struct VirtualNic { global_ctx: ArcGlobalCtx, ifname: Option, ifcfg: Box, } impl VirtualNic { pub fn new(global_ctx: ArcGlobalCtx) -> Self { Self { global_ctx, ifname: None, ifcfg: Box::new(IfConfiger {}), } } async fn create_tun(&mut self) -> Result { let mut config = Configuration::default(); config.layer(Layer::L3); #[cfg(target_os = "linux")] { let dev_name = self.global_ctx.get_flags().dev_name; if !dev_name.is_empty() { config.tun_name(format!("{}", dev_name)); } } #[cfg(any(target_os = "macos"))] config.platform_config(|config| { // disable packet information so we can process the header by ourselves, see tun2 impl for more details config.packet_information(false); }); #[cfg(target_os = "windows")] { let dev_name = self.global_ctx.get_flags().dev_name; match crate::arch::windows::add_self_to_firewall_allowlist() { Ok(_) => tracing::info!("add_self_to_firewall_allowlist successful!"), Err(e) => { println!("Failed to add Easytier to firewall allowlist, Subnet proxy and KCP proxy may not work properly. error: {}", e); println!("You can add firewall rules manually, or use --use-smoltcp to run with user-space TCP/IP stack."); println!(""); } } match RegistryManager::reg_delete_obsoleted_items(&dev_name) { Ok(_) => tracing::trace!("delete successful!"), Err(e) => tracing::error!("An error occurred: {}", e), } if !dev_name.is_empty() { config.tun_name(format!("{}", dev_name)); } else { use rand::distributions::Distribution as _; let c = crate::arch::windows::interface_count()?; let mut rng = rand::thread_rng(); let s: String = rand::distributions::Alphanumeric .sample_iter(&mut rng) .take(4) .map(char::from) .collect::() .to_lowercase(); let random_dev_name = format!("et_{}_{}", c, s); config.tun_name(random_dev_name.clone()); let mut flags = self.global_ctx.get_flags(); flags.dev_name = random_dev_name.clone(); self.global_ctx.set_flags(flags); } config.platform_config(|config| { config.skip_config(true); config.ring_cap(Some(std::cmp::min( config.min_ring_cap() * 32, config.max_ring_cap(), ))); }); } config.up(); let _g = self.global_ctx.net_ns.guard(); Ok(tun::create(&config)?) } #[cfg(target_os = "android")] pub async fn create_dev_for_android( &mut self, tun_fd: std::os::fd::RawFd, ) -> Result, Error> { println!("tun_fd: {}", tun_fd); let mut config = Configuration::default(); config.layer(Layer::L3); config.raw_fd(tun_fd); config.close_fd_on_drop(false); config.up(); let dev = tun::create(&config)?; let dev = AsyncDevice::new(dev)?; let (a, b) = BiLock::new(dev); let ft = TunnelWrapper::new( TunStream::new(a, false), FramedWriter::new_with_converter( TunAsyncWrite { l: b }, TunZCPacketToBytes::new(false), ), None, ); self.ifname = Some(format!("tunfd_{}", tun_fd)); Ok(Box::new(ft)) } pub async fn create_dev(&mut self) -> Result, Error> { let dev = self.create_tun().await?; let ifname = dev.tun_name()?; self.ifcfg.wait_interface_show(ifname.as_str()).await?; #[cfg(target_os = "windows")] { if let Ok(guid) = RegistryManager::find_interface_guid(&ifname) { if let Err(e) = RegistryManager::disable_dynamic_updates(&guid) { tracing::error!( "Failed to disable dhcp for interface {} {}: {}", ifname, guid, e ); } // Disable NetBIOS over TCP/IP if let Err(e) = RegistryManager::disable_netbios(&guid) { tracing::error!( "Failed to disable netbios for interface {} {}: {}", ifname, guid, e ); } } } let dev = AsyncDevice::new(dev)?; let flags = self.global_ctx.config.get_flags(); let mut mtu_in_config = flags.mtu; if flags.enable_encryption { mtu_in_config -= 20; } { // set mtu by ourselves, rust-tun does not handle it correctly on windows let _g = self.global_ctx.net_ns.guard(); self.ifcfg .set_mtu(ifname.as_str(), mtu_in_config as u32) .await?; } let has_packet_info = cfg!(target_os = "macos"); let (a, b) = BiLock::new(dev); let ft = TunnelWrapper::new( TunStream::new(a, has_packet_info), FramedWriter::new_with_converter( TunAsyncWrite { l: b }, TunZCPacketToBytes::new(has_packet_info), ), None, ); self.ifname = Some(ifname.to_owned()); Ok(Box::new(ft)) } pub fn ifname(&self) -> &str { self.ifname.as_ref().unwrap().as_str() } pub async fn link_up(&self) -> Result<(), Error> { let _g = self.global_ctx.net_ns.guard(); self.ifcfg.set_link_status(self.ifname(), true).await?; Ok(()) } pub async fn add_route(&self, address: Ipv4Addr, cidr: u8) -> Result<(), Error> { let _g = self.global_ctx.net_ns.guard(); self.ifcfg .add_ipv4_route(self.ifname(), address, cidr, None) .await?; Ok(()) } pub async fn remove_ip(&self, ip: Option) -> Result<(), Error> { let _g = self.global_ctx.net_ns.guard(); self.ifcfg.remove_ip(self.ifname(), ip).await?; Ok(()) } pub async fn add_ip(&self, ip: Ipv4Addr, cidr: i32) -> Result<(), Error> { let _g = self.global_ctx.net_ns.guard(); self.ifcfg .add_ipv4_ip(self.ifname(), ip, cidr as u8) .await?; Ok(()) } pub fn get_ifcfg(&self) -> impl IfConfiguerTrait { IfConfiger {} } } pub struct NicCtx { global_ctx: ArcGlobalCtx, peer_mgr: Weak, peer_packet_receiver: Arc>, nic: Arc>, tasks: JoinSet<()>, } impl NicCtx { pub fn new( global_ctx: ArcGlobalCtx, peer_manager: &Arc, peer_packet_receiver: Arc>, ) -> Self { NicCtx { global_ctx: global_ctx.clone(), peer_mgr: Arc::downgrade(&peer_manager), peer_packet_receiver, nic: Arc::new(Mutex::new(VirtualNic::new(global_ctx))), tasks: JoinSet::new(), } } pub async fn ifname(&self) -> Option { let nic = self.nic.lock().await; nic.ifname.as_ref().map(|s| s.to_owned()) } pub async fn assign_ipv4_to_tun_device(&self, ipv4_addr: cidr::Ipv4Inet) -> Result<(), Error> { let nic = self.nic.lock().await; nic.link_up().await?; nic.remove_ip(None).await?; nic.add_ip(ipv4_addr.address(), ipv4_addr.network_length() as i32) .await?; #[cfg(any(target_os = "macos", target_os = "freebsd"))] { nic.add_route(ipv4_addr.first_address(), ipv4_addr.network_length()) .await?; } Ok(()) } async fn do_forward_nic_to_peers_ipv4(ret: ZCPacket, mgr: &PeerManager) { if let Some(ipv4) = Ipv4Packet::new(ret.payload()) { if ipv4.get_version() != 4 { tracing::info!("[USER_PACKET] not ipv4 packet: {:?}", ipv4); return; } let dst_ipv4 = ipv4.get_destination(); tracing::trace!( ?ret, "[USER_PACKET] recv new packet from tun device and forward to peers." ); // TODO: use zero-copy let send_ret = mgr.send_msg_ipv4(ret, dst_ipv4).await; if send_ret.is_err() { tracing::trace!(?send_ret, "[USER_PACKET] send_msg_ipv4 failed") } } else { tracing::warn!(?ret, "[USER_PACKET] not ipv4 packet"); } } fn do_forward_nic_to_peers( &mut self, mut stream: Pin>, ) -> Result<(), Error> { // read from nic and write to corresponding tunnel let Some(mgr) = self.peer_mgr.upgrade() else { return Err(anyhow::anyhow!("peer manager not available").into()); }; self.tasks.spawn(async move { while let Some(ret) = stream.next().await { if ret.is_err() { tracing::error!("read from nic failed: {:?}", ret); break; } Self::do_forward_nic_to_peers_ipv4(ret.unwrap(), mgr.as_ref()).await; } panic!("nic stream closed"); }); Ok(()) } fn do_forward_peers_to_nic(&mut self, mut sink: Pin>) { let channel = self.peer_packet_receiver.clone(); self.tasks.spawn(async move { // unlock until coroutine finished let mut channel = channel.lock().await; while let Ok(packet) = recv_packet_from_chan(&mut channel).await { tracing::trace!( "[USER_PACKET] forward packet from peers to nic. packet: {:?}", packet ); let ret = sink.send(packet).await; if ret.is_err() { tracing::error!(?ret, "do_forward_tunnel_to_nic sink error"); } } panic!("peer packet receiver closed"); }); } async fn run_proxy_cidrs_route_updater(&mut self) -> Result<(), Error> { let Some(peer_mgr) = self.peer_mgr.upgrade() else { return Err(anyhow::anyhow!("peer manager not available").into()); }; let global_ctx = self.global_ctx.clone(); let net_ns = self.global_ctx.net_ns.clone(); let nic = self.nic.lock().await; let ifcfg = nic.get_ifcfg(); let ifname = nic.ifname().to_owned(); self.tasks.spawn(async move { let mut cur_proxy_cidrs = BTreeSet::new(); loop { let mut proxy_cidrs = BTreeSet::new(); let routes = peer_mgr.list_routes().await; for r in routes { for cidr in r.proxy_cidrs { let Ok(cidr) = cidr.parse::() else { continue; }; proxy_cidrs.insert(cidr); } } // add vpn portal cidr to proxy_cidrs if let Some(vpn_cfg) = global_ctx.config.get_vpn_portal_config() { proxy_cidrs.insert(vpn_cfg.client_cidr); } if let Some(routes) = global_ctx.config.get_routes() { // if has manual routes, just override entire proxy_cidrs proxy_cidrs = routes.into_iter().collect(); } // if route is in cur_proxy_cidrs but not in proxy_cidrs, delete it. for cidr in cur_proxy_cidrs.iter() { if proxy_cidrs.contains(cidr) { continue; } let _g = net_ns.guard(); let ret = ifcfg .remove_ipv4_route( ifname.as_str(), cidr.first_address(), cidr.network_length(), ) .await; if ret.is_err() { tracing::trace!( cidr = ?cidr, err = ?ret, "remove route failed.", ); } } for cidr in proxy_cidrs.iter() { if cur_proxy_cidrs.contains(cidr) { continue; } let _g = net_ns.guard(); let ret = ifcfg .add_ipv4_route( ifname.as_str(), cidr.first_address(), cidr.network_length(), None, ) .await; if ret.is_err() { tracing::trace!( cidr = ?cidr, err = ?ret, "add route failed.", ); } } cur_proxy_cidrs = proxy_cidrs; tokio::time::sleep(std::time::Duration::from_secs(1)).await; } }); Ok(()) } pub async fn run(&mut self, ipv4_addr: cidr::Ipv4Inet) -> Result<(), Error> { let tunnel = { let mut nic = self.nic.lock().await; match nic.create_dev().await { Ok(ret) => { #[cfg(target_os = "windows")] { let dev_name = self.global_ctx.get_flags().dev_name; let _ = RegistryManager::reg_change_catrgory_in_profile(&dev_name); } self.global_ctx .issue_event(GlobalCtxEvent::TunDeviceReady(nic.ifname().to_string())); ret } Err(err) => { self.global_ctx .issue_event(GlobalCtxEvent::TunDeviceError(err.to_string())); return Err(err); } } }; let (stream, sink) = tunnel.split(); self.do_forward_nic_to_peers(stream)?; self.do_forward_peers_to_nic(sink); self.assign_ipv4_to_tun_device(ipv4_addr).await?; self.run_proxy_cidrs_route_updater().await?; Ok(()) } #[cfg(target_os = "android")] pub async fn run_for_android(&mut self, tun_fd: std::os::fd::RawFd) -> Result<(), Error> { let tunnel = { let mut nic = self.nic.lock().await; match nic.create_dev_for_android(tun_fd).await { Ok(ret) => { self.global_ctx .issue_event(GlobalCtxEvent::TunDeviceReady(nic.ifname().to_string())); ret } Err(err) => { self.global_ctx .issue_event(GlobalCtxEvent::TunDeviceError(err.to_string())); return Err(err); } } }; let (stream, sink) = tunnel.split(); self.do_forward_nic_to_peers(stream)?; self.do_forward_peers_to_nic(sink); Ok(()) } } #[cfg(test)] mod tests { use crate::common::{error::Error, global_ctx::tests::get_mock_global_ctx}; use super::VirtualNic; async fn run_test_helper() -> Result { let mut dev = VirtualNic::new(get_mock_global_ctx()); let _tunnel = dev.create_dev().await?; tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; dev.link_up().await?; dev.remove_ip(None).await?; dev.add_ip("10.144.111.1".parse().unwrap(), 24).await?; Ok(dev) } #[tokio::test] async fn tun_test() { let _dev = run_test_helper().await.unwrap(); // let mut stream = nic.pin_recv_stream(); // while let Some(item) = stream.next().await { // println!("item: {:?}", item); // } // let framed = dev.into_framed(); // let (mut s, mut b) = framed.split(); // loop { // let tmp = b.next().await.unwrap().unwrap(); // let tmp = EthernetPacket::new(tmp.get_bytes()); // println!("ret: {:?}", tmp.unwrap()); // } } }