diff --git a/easytier/src/common/global_ctx.rs b/easytier/src/common/global_ctx.rs index 98208fb..d2a2eae 100644 --- a/easytier/src/common/global_ctx.rs +++ b/easytier/src/common/global_ctx.rs @@ -5,6 +5,7 @@ use std::{ }; use crate::common::config::ProxyNetworkConfig; +use crate::common::stats_manager::StatsManager; use crate::common::token_bucket::TokenBucketManager; use crate::peers::acl_filter::AclFilter; use crate::proto::cli::PeerConnInfo; @@ -83,6 +84,8 @@ pub struct GlobalCtx { token_bucket_manager: TokenBucketManager, + stats_manager: Arc, + acl_filter: Arc, } @@ -151,6 +154,8 @@ impl GlobalCtx { token_bucket_manager: TokenBucketManager::new(), + stats_manager: Arc::new(StatsManager::new()), + acl_filter: Arc::new(AclFilter::new()), } } @@ -323,6 +328,10 @@ impl GlobalCtx { &self.token_bucket_manager } + pub fn stats_manager(&self) -> &Arc { + &self.stats_manager + } + pub fn get_acl_filter(&self) -> &Arc { &self.acl_filter } diff --git a/easytier/src/common/mod.rs b/easytier/src/common/mod.rs index 9bc5028..2178c96 100644 --- a/easytier/src/common/mod.rs +++ b/easytier/src/common/mod.rs @@ -22,6 +22,7 @@ pub mod ifcfg; pub mod netns; pub mod network; pub mod scoped_task; +pub mod stats_manager; pub mod stun; pub mod stun_codec_ext; pub mod token_bucket; diff --git a/easytier/src/common/stats_manager.rs b/easytier/src/common/stats_manager.rs new file mode 100644 index 0000000..fa20cf6 --- /dev/null +++ b/easytier/src/common/stats_manager.rs @@ -0,0 +1,879 @@ +use dashmap::DashMap; +use serde::{Deserialize, Serialize}; +use std::cell::UnsafeCell; +use std::fmt; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tokio::time::interval; + +use crate::common::scoped_task::ScopedTask; + +/// Predefined metric names for type safety +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub enum MetricName { + /// RPC calls sent to peers + PeerRpcClientTx, + /// RPC calls received from peers + PeerRpcClientRx, + /// RPC calls sent to peers + PeerRpcServerTx, + /// RPC calls received from peers + PeerRpcServerRx, + /// RPC call duration in milliseconds + PeerRpcDuration, + /// RPC errors + PeerRpcErrors, + + /// Traffic bytes sent + TrafficBytesTx, + /// Traffic bytes received + TrafficBytesRx, + /// Traffic bytes forwarded + TrafficBytesForwarded, + /// Traffic bytes sent to self + TrafficBytesSelfTx, + /// Traffic bytes received from self + TrafficBytesSelfRx, + /// Traffic bytes forwarded for foreign network, rx to local + TrafficBytesForeignForwardRx, + /// Traffic bytes forwarded for foreign network, tx from local + TrafficBytesForeignForwardTx, + /// Traffic bytes forwarded for foreign network, forward + TrafficBytesForeignForwardForwarded, + + /// Traffic packets sent + TrafficPacketsTx, + /// Traffic packets received + TrafficPacketsRx, + /// Traffic packets forwarded + TrafficPacketsForwarded, + /// Traffic packets sent to self + TrafficPacketsSelfTx, + /// Traffic packets received from self + TrafficPacketsSelfRx, + /// Traffic packets forwarded for foreign network, rx to local + TrafficPacketsForeignForwardRx, + /// Traffic packets forwarded for foreign network, tx from local + TrafficPacketsForeignForwardTx, + /// Traffic packets forwarded for foreign network, forward + TrafficPacketsForeignForwardForwarded, + + /// Compression bytes before compression + CompressionBytesRxBefore, + /// Compression bytes after compression + CompressionBytesRxAfter, + /// Compression bytes before compression + CompressionBytesTxBefore, + /// Compression bytes after compression + CompressionBytesTxAfter, +} + +impl fmt::Display for MetricName { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + MetricName::PeerRpcClientTx => write!(f, "peer_rpc_client_tx"), + MetricName::PeerRpcClientRx => write!(f, "peer_rpc_client_rx"), + MetricName::PeerRpcServerTx => write!(f, "peer_rpc_server_tx"), + MetricName::PeerRpcServerRx => write!(f, "peer_rpc_server_rx"), + MetricName::PeerRpcDuration => write!(f, "peer_rpc_duration_ms"), + MetricName::PeerRpcErrors => write!(f, "peer_rpc_errors"), + + MetricName::TrafficBytesTx => write!(f, "traffic_bytes_tx"), + MetricName::TrafficBytesRx => write!(f, "traffic_bytes_rx"), + MetricName::TrafficBytesForwarded => write!(f, "traffic_bytes_forwarded"), + MetricName::TrafficBytesSelfTx => write!(f, "traffic_bytes_self_tx"), + MetricName::TrafficBytesSelfRx => write!(f, "traffic_bytes_self_rx"), + MetricName::TrafficBytesForeignForwardRx => { + write!(f, "traffic_bytes_foreign_forward_rx") + } + MetricName::TrafficBytesForeignForwardTx => { + write!(f, "traffic_bytes_foreign_forward_tx") + } + MetricName::TrafficBytesForeignForwardForwarded => { + write!(f, "traffic_bytes_foreign_forward_forwarded") + } + + MetricName::TrafficPacketsTx => write!(f, "traffic_packets_tx"), + MetricName::TrafficPacketsRx => write!(f, "traffic_packets_rx"), + MetricName::TrafficPacketsForwarded => write!(f, "traffic_packets_forwarded"), + MetricName::TrafficPacketsSelfTx => write!(f, "traffic_packets_self_tx"), + MetricName::TrafficPacketsSelfRx => write!(f, "traffic_packets_self_rx"), + MetricName::TrafficPacketsForeignForwardRx => { + write!(f, "traffic_packets_foreign_forward_rx") + } + MetricName::TrafficPacketsForeignForwardTx => { + write!(f, "traffic_packets_foreign_forward_tx") + } + MetricName::TrafficPacketsForeignForwardForwarded => { + write!(f, "traffic_packets_foreign_forward_forwarded") + } + + MetricName::CompressionBytesRxBefore => write!(f, "compression_bytes_rx_before"), + MetricName::CompressionBytesRxAfter => write!(f, "compression_bytes_rx_after"), + MetricName::CompressionBytesTxBefore => write!(f, "compression_bytes_tx_before"), + MetricName::CompressionBytesTxAfter => write!(f, "compression_bytes_tx_after"), + } + } +} + +/// Predefined label types for type safety +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub enum LabelType { + /// Network Name + NetworkName(String), + /// Source peer ID + SrcPeerId(u32), + /// Destination peer ID + DstPeerId(u32), + /// Service name + ServiceName(String), + /// Method name + MethodName(String), + /// Protocol type + Protocol(String), + /// Direction (tx/rx) + Direction(String), + /// Compression algorithm + CompressionAlgo(String), + /// Error type + ErrorType(String), + /// Status + Status(String), +} + +impl fmt::Display for LabelType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + LabelType::NetworkName(name) => write!(f, "network_name={}", name), + LabelType::SrcPeerId(id) => write!(f, "src_peer_id={}", id), + LabelType::DstPeerId(id) => write!(f, "dst_peer_id={}", id), + LabelType::ServiceName(name) => write!(f, "service_name={}", name), + LabelType::MethodName(name) => write!(f, "method_name={}", name), + LabelType::Protocol(proto) => write!(f, "protocol={}", proto), + LabelType::Direction(dir) => write!(f, "direction={}", dir), + LabelType::CompressionAlgo(algo) => write!(f, "compression_algo={}", algo), + LabelType::ErrorType(err) => write!(f, "error_type={}", err), + LabelType::Status(status) => write!(f, "status={}", status), + } + } +} + +impl LabelType { + pub fn key(&self) -> &'static str { + match self { + LabelType::NetworkName(_) => "network_name", + LabelType::SrcPeerId(_) => "src_peer_id", + LabelType::DstPeerId(_) => "dst_peer_id", + LabelType::ServiceName(_) => "service_name", + LabelType::MethodName(_) => "method_name", + LabelType::Protocol(_) => "protocol", + LabelType::Direction(_) => "direction", + LabelType::CompressionAlgo(_) => "compression_algo", + LabelType::ErrorType(_) => "error_type", + LabelType::Status(_) => "status", + } + } + + pub fn value(&self) -> String { + match self { + LabelType::NetworkName(name) => name.clone(), + LabelType::SrcPeerId(id) => id.to_string(), + LabelType::DstPeerId(id) => id.to_string(), + LabelType::ServiceName(name) => name.clone(), + LabelType::MethodName(name) => name.clone(), + LabelType::Protocol(proto) => proto.clone(), + LabelType::Direction(dir) => dir.clone(), + LabelType::CompressionAlgo(algo) => algo.clone(), + LabelType::ErrorType(err) => err.clone(), + LabelType::Status(status) => status.clone(), + } + } +} + +/// Label represents a key-value pair for metric identification +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct Label { + pub key: String, + pub value: String, +} + +impl Label { + pub fn new(key: impl Into, value: impl Into) -> Self { + Self { + key: key.into(), + value: value.into(), + } + } + + pub fn from_label_type(label_type: &LabelType) -> Self { + Self { + key: label_type.key().to_string(), + value: label_type.value(), + } + } +} + +/// LabelSet represents a collection of labels for a metric +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct LabelSet { + labels: Vec