diff --git a/easytier/src/common/stats_manager.rs b/easytier/src/common/stats_manager.rs index a950cd5..b7d8077 100644 --- a/easytier/src/common/stats_manager.rs +++ b/easytier/src/common/stats_manager.rs @@ -66,6 +66,8 @@ pub enum MetricName { CompressionBytesTxBefore, /// Compression bytes after compression CompressionBytesTxAfter, + + TcpProxyConnect, } impl fmt::Display for MetricName { @@ -112,6 +114,8 @@ impl fmt::Display for MetricName { MetricName::CompressionBytesRxAfter => write!(f, "compression_bytes_rx_after"), MetricName::CompressionBytesTxBefore => write!(f, "compression_bytes_tx_before"), MetricName::CompressionBytesTxAfter => write!(f, "compression_bytes_tx_after"), + + MetricName::TcpProxyConnect => write!(f, "tcp_proxy_connect"), } } } @@ -139,6 +143,10 @@ pub enum LabelType { ErrorType(String), /// Status Status(String), + /// Dst Ip + DstIp(String), + /// Mapped Dst Ip + MappedDstIp(String), } impl fmt::Display for LabelType { @@ -154,6 +162,8 @@ impl fmt::Display for LabelType { LabelType::CompressionAlgo(algo) => write!(f, "compression_algo={}", algo), LabelType::ErrorType(err) => write!(f, "error_type={}", err), LabelType::Status(status) => write!(f, "status={}", status), + LabelType::DstIp(ip) => write!(f, "dst_ip={}", ip), + LabelType::MappedDstIp(ip) => write!(f, "mapped_dst_ip={}", ip), } } } @@ -171,6 +181,8 @@ impl LabelType { LabelType::CompressionAlgo(_) => "compression_algo", LabelType::ErrorType(_) => "error_type", LabelType::Status(_) => "status", + LabelType::DstIp(_) => "dst_ip", + LabelType::MappedDstIp(_) => "mapped_dst_ip", } } @@ -186,6 +198,8 @@ impl LabelType { LabelType::CompressionAlgo(algo) => algo.clone(), LabelType::ErrorType(err) => err.clone(), LabelType::Status(status) => status.clone(), + LabelType::DstIp(ip) => ip.clone(), + LabelType::MappedDstIp(ip) => ip.clone(), } } } diff --git a/easytier/src/gateway/kcp_proxy.rs b/easytier/src/gateway/kcp_proxy.rs index 9755162..aeff9d5 100644 --- a/easytier/src/gateway/kcp_proxy.rs +++ b/easytier/src/gateway/kcp_proxy.rs @@ -266,6 +266,11 @@ impl> NicPacketFilt .check_dst_allow_kcp_input(&ip_packet.get_destination()) .await { + tracing::warn!( + "{:?} proxy src: dst {} not allow kcp input", + self.get_tcp_proxy().get_transport_type(), + ip_packet.get_destination() + ); return false; } } else { @@ -288,6 +293,12 @@ impl> NicPacketFilt if ip_packet.get_source() != my_ipv4.address() && !self.get_tcp_proxy().is_smoltcp_enabled() { + tracing::warn!( + "{:?} nat 2 nat packet, src: {} dst: {} not allow kcp input", + self.get_tcp_proxy().get_transport_type(), + ip_packet.get_source(), + ip_packet.get_destination() + ); return false; } }; diff --git a/easytier/src/gateway/quic_proxy.rs b/easytier/src/gateway/quic_proxy.rs index 1631855..ce95f91 100644 --- a/easytier/src/gateway/quic_proxy.rs +++ b/easytier/src/gateway/quic_proxy.rs @@ -200,6 +200,11 @@ impl TcpProxyForKcpSrcTrait for TcpProxyForQUICSrc { let Some(peer_info) = peer_map.get_route_peer_info(dst_peer_id).await else { return false; }; + tracing::debug!( + "check dst {} allow quic input, peer info: {:?}", + dst_ip, + peer_info + ); let Some(quic_port) = peer_info.quic_port else { return false; }; diff --git a/easytier/src/gateway/tcp_proxy.rs b/easytier/src/gateway/tcp_proxy.rs index 12ed94f..17ec194 100644 --- a/easytier/src/gateway/tcp_proxy.rs +++ b/easytier/src/gateway/tcp_proxy.rs @@ -24,6 +24,7 @@ use crate::common::error::Result; use crate::common::global_ctx::{ArcGlobalCtx, GlobalCtx}; use crate::common::join_joinset_background; +use crate::common::stats_manager::{LabelSet, LabelType, MetricName}; use crate::peers::peer_manager::PeerManager; use crate::peers::{NicPacketFilter, PeerPacketFilter}; use crate::proto::cli::{ @@ -722,6 +723,21 @@ impl TcpProxy { nat_entry.real_dst }; + global_ctx + .stats_manager() + .get_counter( + MetricName::TcpProxyConnect, + LabelSet::new() + .with_label_type(LabelType::Protocol( + connector.transport_type().as_str_name().to_string(), + )) + .with_label_type(LabelType::DstIp(nat_dst.ip().to_string())) + .with_label_type(LabelType::MappedDstIp( + nat_entry.mapped_dst.ip().to_string(), + )), + ) + .inc(); + let _guard = global_ctx.net_ns.guard(); let Ok(dst_tcp_stream) = connector.connect(nat_entry.src, nat_dst).await else { tracing::error!("connect to dst failed: {:?}", nat_entry); diff --git a/easytier/src/instance/instance.rs b/easytier/src/instance/instance.rs index 291dc09..9ad6a43 100644 --- a/easytier/src/instance/instance.rs +++ b/easytier/src/instance/instance.rs @@ -527,7 +527,7 @@ impl Instance { } async fn run_quic_dst(&mut self) -> Result<(), Error> { - if !self.global_ctx.get_flags().enable_quic_proxy { + if self.global_ctx.get_flags().disable_quic_input { return Ok(()); } diff --git a/easytier/src/peers/peer_manager.rs b/easytier/src/peers/peer_manager.rs index 9ef2330..aecdcaa 100644 --- a/easytier/src/peers/peer_manager.rs +++ b/easytier/src/peers/peer_manager.rs @@ -1386,6 +1386,11 @@ impl PeerManager { return false; }; + if next_hop_id == dst_peer_id { + // dst p2p, no need to relay + return true; + } + let Some(next_hop_info) = route.get_peer_info(next_hop_id).await else { return false; }; diff --git a/easytier/src/tests/three_node.rs b/easytier/src/tests/three_node.rs index ff16b75..0b46617 100644 --- a/easytier/src/tests/three_node.rs +++ b/easytier/src/tests/three_node.rs @@ -15,9 +15,10 @@ use crate::{ common::{ config::{ConfigLoader, NetworkIdentity, PortForwardConfig, TomlConfigLoader}, netns::{NetNS, ROOT_NETNS_NAME}, + stats_manager::{LabelType, MetricName}, }, instance::instance::Instance, - proto::common::CompressionAlgoPb, + proto::{cli::TcpProxyEntryTransportType, common::CompressionAlgoPb}, tunnel::{ common::tests::{_tunnel_bench_netns, wait_for_condition}, ring::RingTunnelConnector, @@ -340,11 +341,12 @@ pub async fn basic_three_node_test( drop_insts(insts).await; } -async fn subnet_proxy_test_udp(target_ip: &str) { +async fn subnet_proxy_test_udp(listen_ip: &str, target_ip: &str) { use crate::tunnel::{common::tests::_tunnel_pingpong_netns, udp::UdpTunnelListener}; use rand::Rng; - let udp_listener = UdpTunnelListener::new("udp://10.1.2.4:22233".parse().unwrap()); + let udp_listener = + UdpTunnelListener::new(format!("udp://{}:22233", listen_ip).parse().unwrap()); let udp_connector = UdpTunnelConnector::new(format!("udp://{}:22233", target_ip).parse().unwrap()); @@ -352,17 +354,24 @@ async fn subnet_proxy_test_udp(target_ip: &str) { let mut buf = vec![0; 7 * 1024]; rand::thread_rng().fill(&mut buf[..]); + let ns_name = if target_ip == "10.144.144.3" { + "net_c" + } else { + "net_d" + }; + _tunnel_pingpong_netns( udp_listener, udp_connector, - NetNS::new(Some("net_d".into())), + NetNS::new(Some(ns_name.into())), NetNS::new(Some("net_a".into())), buf, ) .await; // no fragment - let udp_listener = UdpTunnelListener::new("udp://10.1.2.4:22233".parse().unwrap()); + let udp_listener = + UdpTunnelListener::new(format!("udp://{}:22233", listen_ip).parse().unwrap()); let udp_connector = UdpTunnelConnector::new(format!("udp://{}:22233", target_ip).parse().unwrap()); @@ -372,77 +381,34 @@ async fn subnet_proxy_test_udp(target_ip: &str) { _tunnel_pingpong_netns( udp_listener, udp_connector, - NetNS::new(Some("net_d".into())), - NetNS::new(Some("net_a".into())), - buf, - ) - .await; - - // connect to virtual ip (no tun mode) - - let udp_listener = UdpTunnelListener::new("udp://0.0.0.0:22234".parse().unwrap()); - let udp_connector = UdpTunnelConnector::new("udp://10.144.144.3:22234".parse().unwrap()); - // NOTE: this should not excced udp tunnel max buffer size - let mut buf = vec![0; 7 * 1024]; - rand::thread_rng().fill(&mut buf[..]); - - _tunnel_pingpong_netns( - udp_listener, - udp_connector, - NetNS::new(Some("net_c".into())), - NetNS::new(Some("net_a".into())), - buf, - ) - .await; - - // no fragment - let udp_listener = UdpTunnelListener::new("udp://0.0.0.0:22235".parse().unwrap()); - let udp_connector = UdpTunnelConnector::new("udp://10.144.144.3:22235".parse().unwrap()); - - let mut buf = vec![0; 1024]; - rand::thread_rng().fill(&mut buf[..]); - - _tunnel_pingpong_netns( - udp_listener, - udp_connector, - NetNS::new(Some("net_c".into())), + NetNS::new(Some(ns_name.into())), NetNS::new(Some("net_a".into())), buf, ) .await; } -async fn subnet_proxy_test_tcp(target_ip: &str) { +async fn subnet_proxy_test_tcp(listen_ip: &str, connect_ip: &str) { use crate::tunnel::{common::tests::_tunnel_pingpong_netns, tcp::TcpTunnelListener}; use rand::Rng; - let tcp_listener = TcpTunnelListener::new("tcp://10.1.2.4:22223".parse().unwrap()); + let tcp_listener = TcpTunnelListener::new(format!("tcp://{listen_ip}:22223").parse().unwrap()); let tcp_connector = - TcpTunnelConnector::new(format!("tcp://{}:22223", target_ip).parse().unwrap()); + TcpTunnelConnector::new(format!("tcp://{}:22223", connect_ip).parse().unwrap()); let mut buf = vec![0; 32]; rand::thread_rng().fill(&mut buf[..]); - _tunnel_pingpong_netns( - tcp_listener, - tcp_connector, - NetNS::new(Some("net_d".into())), - NetNS::new(Some("net_a".into())), - buf, - ) - .await; - - // connect to virtual ip (no tun mode) - let tcp_listener = TcpTunnelListener::new("tcp://0.0.0.0:22223".parse().unwrap()); - let tcp_connector = TcpTunnelConnector::new("tcp://10.144.144.3:22223".parse().unwrap()); - - let mut buf = vec![0; 32]; - rand::thread_rng().fill(&mut buf[..]); + let ns_name = if connect_ip == "10.144.144.3" { + "net_c" + } else { + "net_d" + }; _tunnel_pingpong_netns( tcp_listener, tcp_connector, - NetNS::new(Some("net_c".into())), + NetNS::new(Some(ns_name.into())), NetNS::new(Some("net_a".into())), buf, ) @@ -461,19 +427,6 @@ async fn subnet_proxy_test_icmp(target_ip: &str) { Duration::from_secs(5), ) .await; - - // connect to virtual ip (no tun mode) - wait_for_condition( - || async { ping_test("net_a", "10.144.144.3", None).await }, - Duration::from_secs(5), - ) - .await; - - wait_for_condition( - || async { ping_test("net_a", "10.144.144.3", Some(5 * 1024)).await }, - Duration::from_secs(5), - ) - .await; } #[tokio::test] @@ -484,6 +437,10 @@ pub async fn quic_proxy() { if cfg.get_inst_name() == "inst3" { cfg.add_proxy_cidr("10.1.2.0/24".parse().unwrap(), None) .unwrap(); + } else if cfg.get_inst_name() == "inst1" { + let mut flags = cfg.get_flags(); + flags.enable_quic_proxy = true; + cfg.set_flags(flags); } cfg }, @@ -504,7 +461,17 @@ pub async fn quic_proxy() { let target_ip = "10.1.2.4"; subnet_proxy_test_icmp(target_ip).await; - subnet_proxy_test_tcp(target_ip).await; + subnet_proxy_test_icmp("10.144.144.3").await; + subnet_proxy_test_tcp(target_ip, target_ip).await; + subnet_proxy_test_tcp("0.0.0.0", "10.144.144.3").await; + + let metrics = insts[0] + .get_global_ctx() + .stats_manager() + .get_metrics_by_prefix(&MetricName::TcpProxyConnect.to_string()); + assert_eq!(metrics.len(), 2); + assert_eq!(1, metrics[0].value); + assert_eq!(1, metrics[1].value); drop_insts(insts).await; } @@ -583,10 +550,64 @@ pub async fn subnet_proxy_three_node_test( ) .await; - for target_ip in ["10.1.3.4", "10.1.2.4"].iter() { + for target_ip in ["10.1.3.4", "10.1.2.4", "10.144.144.3"] { subnet_proxy_test_icmp(target_ip).await; - subnet_proxy_test_tcp(target_ip).await; - subnet_proxy_test_udp(target_ip).await; + let listen_ip = if target_ip == "10.144.144.3" { + "0.0.0.0" + } else { + "10.1.2.4" + }; + subnet_proxy_test_tcp(listen_ip, target_ip).await; + subnet_proxy_test_udp(listen_ip, target_ip).await; + } + + if enable_kcp_proxy && !disable_kcp_input { + let metrics = insts[0] + .get_global_ctx() + .stats_manager() + .get_metrics_by_prefix(&MetricName::TcpProxyConnect.to_string()); + assert_eq!(metrics.len(), 3); + for metric in metrics { + assert_eq!(1, metric.value); + assert!(metric.labels.labels().iter().any(|l| { + let t = + LabelType::Protocol(TcpProxyEntryTransportType::Kcp.as_str_name().to_string()); + t.key() == l.key && t.value() == l.value + })); + } + } else if enable_quic_proxy && !disable_quic_input { + let metrics = insts[0] + .get_global_ctx() + .stats_manager() + .get_metrics_by_prefix(&MetricName::TcpProxyConnect.to_string()); + assert_eq!(metrics.len(), 3); + for metric in metrics { + assert_eq!(1, metric.value); + assert!(metric.labels.labels().iter().any(|l| { + let t = + LabelType::Protocol(TcpProxyEntryTransportType::Quic.as_str_name().to_string()); + t.key() == l.key && t.value() == l.value + })); + } + } else { + // tcp subnet proxy + let metrics = insts[2] + .get_global_ctx() + .stats_manager() + .get_metrics_by_prefix(&MetricName::TcpProxyConnect.to_string()); + if no_tun { + assert_eq!(metrics.len(), 3); + } else { + assert_eq!(metrics.len(), 2); + } + for metric in metrics { + assert_eq!(1, metric.value); + assert!(metric.labels.labels().iter().any(|l| { + let t = + LabelType::Protocol(TcpProxyEntryTransportType::Tcp.as_str_name().to_string()); + t.key() == l.key && t.value() == l.value + })); + } } drop_insts(insts).await;