add stats metrics (#1207)
Some checks failed
EasyTier Core / pre_job (push) Has been cancelled
EasyTier Core / build_web (push) Has been cancelled
EasyTier Core / build (freebsd-13.2-x86_64, 13.2, ubuntu-22.04, x86_64-unknown-freebsd) (push) Has been cancelled
EasyTier Core / build (linux-aarch64, ubuntu-22.04, aarch64-unknown-linux-musl) (push) Has been cancelled
EasyTier Core / build (linux-arm, ubuntu-22.04, arm-unknown-linux-musleabi) (push) Has been cancelled
EasyTier Core / build (linux-armhf, ubuntu-22.04, arm-unknown-linux-musleabihf) (push) Has been cancelled
EasyTier Core / build (linux-armv7, ubuntu-22.04, armv7-unknown-linux-musleabi) (push) Has been cancelled
EasyTier Core / build (linux-armv7hf, ubuntu-22.04, armv7-unknown-linux-musleabihf) (push) Has been cancelled
EasyTier Core / build (linux-loongarch64, ubuntu-24.04, loongarch64-unknown-linux-musl) (push) Has been cancelled
EasyTier Core / build (linux-mips, ubuntu-22.04, mips-unknown-linux-musl) (push) Has been cancelled
EasyTier Core / build (linux-mipsel, ubuntu-22.04, mipsel-unknown-linux-musl) (push) Has been cancelled
EasyTier Core / build (linux-riscv64, ubuntu-22.04, riscv64gc-unknown-linux-musl) (push) Has been cancelled
EasyTier Core / build (linux-x86_64, ubuntu-22.04, x86_64-unknown-linux-musl) (push) Has been cancelled
EasyTier Core / build (macos-aarch64, macos-latest, aarch64-apple-darwin) (push) Has been cancelled
EasyTier Core / build (macos-x86_64, macos-latest, x86_64-apple-darwin) (push) Has been cancelled
EasyTier Core / build (windows-arm64, windows-latest, aarch64-pc-windows-msvc) (push) Has been cancelled
EasyTier Core / build (windows-i686, windows-latest, i686-pc-windows-msvc) (push) Has been cancelled
EasyTier Core / build (windows-x86_64, windows-latest, x86_64-pc-windows-msvc) (push) Has been cancelled
EasyTier Core / core-result (push) Has been cancelled
EasyTier Core / magisk_build (push) Has been cancelled
EasyTier GUI / pre_job (push) Has been cancelled
EasyTier GUI / build-gui (linux-aarch64, aarch64-unknown-linux-gnu, ubuntu-22.04, aarch64-unknown-linux-musl) (push) Has been cancelled
EasyTier GUI / build-gui (linux-x86_64, x86_64-unknown-linux-gnu, ubuntu-22.04, x86_64-unknown-linux-musl) (push) Has been cancelled
EasyTier GUI / build-gui (macos-aarch64, aarch64-apple-darwin, macos-latest, aarch64-apple-darwin) (push) Has been cancelled
EasyTier GUI / build-gui (macos-x86_64, x86_64-apple-darwin, macos-latest, x86_64-apple-darwin) (push) Has been cancelled
EasyTier GUI / build-gui (windows-arm64, aarch64-pc-windows-msvc, windows-latest, aarch64-pc-windows-msvc) (push) Has been cancelled
EasyTier GUI / build-gui (windows-i686, i686-pc-windows-msvc, windows-latest, i686-pc-windows-msvc) (push) Has been cancelled
EasyTier GUI / build-gui (windows-x86_64, x86_64-pc-windows-msvc, windows-latest, x86_64-pc-windows-msvc) (push) Has been cancelled
EasyTier GUI / gui-result (push) Has been cancelled
EasyTier Mobile / pre_job (push) Has been cancelled
EasyTier Mobile / build-mobile (android, ubuntu-22.04, android) (push) Has been cancelled
EasyTier Mobile / mobile-result (push) Has been cancelled
EasyTier OHOS / pre_job (push) Has been cancelled
EasyTier OHOS / build-ohos (push) Has been cancelled
EasyTier Test / pre_job (push) Has been cancelled
EasyTier Test / test (push) Has been cancelled

support new cli command `easytier-cli stats`

It's useful to find out which components are consuming bandwidth.
This commit is contained in:
Sijie.Sun
2025-08-09 00:06:35 +08:00
committed by GitHub
parent efa17a7c10
commit 8cdb27d43d
15 changed files with 1442 additions and 19 deletions

View File

@@ -5,6 +5,7 @@ use std::{
};
use crate::common::config::ProxyNetworkConfig;
use crate::common::stats_manager::StatsManager;
use crate::common::token_bucket::TokenBucketManager;
use crate::peers::acl_filter::AclFilter;
use crate::proto::cli::PeerConnInfo;
@@ -83,6 +84,8 @@ pub struct GlobalCtx {
token_bucket_manager: TokenBucketManager,
stats_manager: Arc<StatsManager>,
acl_filter: Arc<AclFilter>,
}
@@ -151,6 +154,8 @@ impl GlobalCtx {
token_bucket_manager: TokenBucketManager::new(),
stats_manager: Arc::new(StatsManager::new()),
acl_filter: Arc::new(AclFilter::new()),
}
}
@@ -323,6 +328,10 @@ impl GlobalCtx {
&self.token_bucket_manager
}
pub fn stats_manager(&self) -> &Arc<StatsManager> {
&self.stats_manager
}
pub fn get_acl_filter(&self) -> &Arc<AclFilter> {
&self.acl_filter
}

View File

@@ -22,6 +22,7 @@ pub mod ifcfg;
pub mod netns;
pub mod network;
pub mod scoped_task;
pub mod stats_manager;
pub mod stun;
pub mod stun_codec_ext;
pub mod token_bucket;

View File

@@ -0,0 +1,879 @@
use dashmap::DashMap;
use serde::{Deserialize, Serialize};
use std::cell::UnsafeCell;
use std::fmt;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::time::interval;
use crate::common::scoped_task::ScopedTask;
/// Predefined metric names for type safety
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum MetricName {
/// RPC calls sent to peers
PeerRpcClientTx,
/// RPC calls received from peers
PeerRpcClientRx,
/// RPC calls sent to peers
PeerRpcServerTx,
/// RPC calls received from peers
PeerRpcServerRx,
/// RPC call duration in milliseconds
PeerRpcDuration,
/// RPC errors
PeerRpcErrors,
/// Traffic bytes sent
TrafficBytesTx,
/// Traffic bytes received
TrafficBytesRx,
/// Traffic bytes forwarded
TrafficBytesForwarded,
/// Traffic bytes sent to self
TrafficBytesSelfTx,
/// Traffic bytes received from self
TrafficBytesSelfRx,
/// Traffic bytes forwarded for foreign network, rx to local
TrafficBytesForeignForwardRx,
/// Traffic bytes forwarded for foreign network, tx from local
TrafficBytesForeignForwardTx,
/// Traffic bytes forwarded for foreign network, forward
TrafficBytesForeignForwardForwarded,
/// Traffic packets sent
TrafficPacketsTx,
/// Traffic packets received
TrafficPacketsRx,
/// Traffic packets forwarded
TrafficPacketsForwarded,
/// Traffic packets sent to self
TrafficPacketsSelfTx,
/// Traffic packets received from self
TrafficPacketsSelfRx,
/// Traffic packets forwarded for foreign network, rx to local
TrafficPacketsForeignForwardRx,
/// Traffic packets forwarded for foreign network, tx from local
TrafficPacketsForeignForwardTx,
/// Traffic packets forwarded for foreign network, forward
TrafficPacketsForeignForwardForwarded,
/// Compression bytes before compression
CompressionBytesRxBefore,
/// Compression bytes after compression
CompressionBytesRxAfter,
/// Compression bytes before compression
CompressionBytesTxBefore,
/// Compression bytes after compression
CompressionBytesTxAfter,
}
impl fmt::Display for MetricName {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
MetricName::PeerRpcClientTx => write!(f, "peer_rpc_client_tx"),
MetricName::PeerRpcClientRx => write!(f, "peer_rpc_client_rx"),
MetricName::PeerRpcServerTx => write!(f, "peer_rpc_server_tx"),
MetricName::PeerRpcServerRx => write!(f, "peer_rpc_server_rx"),
MetricName::PeerRpcDuration => write!(f, "peer_rpc_duration_ms"),
MetricName::PeerRpcErrors => write!(f, "peer_rpc_errors"),
MetricName::TrafficBytesTx => write!(f, "traffic_bytes_tx"),
MetricName::TrafficBytesRx => write!(f, "traffic_bytes_rx"),
MetricName::TrafficBytesForwarded => write!(f, "traffic_bytes_forwarded"),
MetricName::TrafficBytesSelfTx => write!(f, "traffic_bytes_self_tx"),
MetricName::TrafficBytesSelfRx => write!(f, "traffic_bytes_self_rx"),
MetricName::TrafficBytesForeignForwardRx => {
write!(f, "traffic_bytes_foreign_forward_rx")
}
MetricName::TrafficBytesForeignForwardTx => {
write!(f, "traffic_bytes_foreign_forward_tx")
}
MetricName::TrafficBytesForeignForwardForwarded => {
write!(f, "traffic_bytes_foreign_forward_forwarded")
}
MetricName::TrafficPacketsTx => write!(f, "traffic_packets_tx"),
MetricName::TrafficPacketsRx => write!(f, "traffic_packets_rx"),
MetricName::TrafficPacketsForwarded => write!(f, "traffic_packets_forwarded"),
MetricName::TrafficPacketsSelfTx => write!(f, "traffic_packets_self_tx"),
MetricName::TrafficPacketsSelfRx => write!(f, "traffic_packets_self_rx"),
MetricName::TrafficPacketsForeignForwardRx => {
write!(f, "traffic_packets_foreign_forward_rx")
}
MetricName::TrafficPacketsForeignForwardTx => {
write!(f, "traffic_packets_foreign_forward_tx")
}
MetricName::TrafficPacketsForeignForwardForwarded => {
write!(f, "traffic_packets_foreign_forward_forwarded")
}
MetricName::CompressionBytesRxBefore => write!(f, "compression_bytes_rx_before"),
MetricName::CompressionBytesRxAfter => write!(f, "compression_bytes_rx_after"),
MetricName::CompressionBytesTxBefore => write!(f, "compression_bytes_tx_before"),
MetricName::CompressionBytesTxAfter => write!(f, "compression_bytes_tx_after"),
}
}
}
/// Predefined label types for type safety
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum LabelType {
/// Network Name
NetworkName(String),
/// Source peer ID
SrcPeerId(u32),
/// Destination peer ID
DstPeerId(u32),
/// Service name
ServiceName(String),
/// Method name
MethodName(String),
/// Protocol type
Protocol(String),
/// Direction (tx/rx)
Direction(String),
/// Compression algorithm
CompressionAlgo(String),
/// Error type
ErrorType(String),
/// Status
Status(String),
}
impl fmt::Display for LabelType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
LabelType::NetworkName(name) => write!(f, "network_name={}", name),
LabelType::SrcPeerId(id) => write!(f, "src_peer_id={}", id),
LabelType::DstPeerId(id) => write!(f, "dst_peer_id={}", id),
LabelType::ServiceName(name) => write!(f, "service_name={}", name),
LabelType::MethodName(name) => write!(f, "method_name={}", name),
LabelType::Protocol(proto) => write!(f, "protocol={}", proto),
LabelType::Direction(dir) => write!(f, "direction={}", dir),
LabelType::CompressionAlgo(algo) => write!(f, "compression_algo={}", algo),
LabelType::ErrorType(err) => write!(f, "error_type={}", err),
LabelType::Status(status) => write!(f, "status={}", status),
}
}
}
impl LabelType {
pub fn key(&self) -> &'static str {
match self {
LabelType::NetworkName(_) => "network_name",
LabelType::SrcPeerId(_) => "src_peer_id",
LabelType::DstPeerId(_) => "dst_peer_id",
LabelType::ServiceName(_) => "service_name",
LabelType::MethodName(_) => "method_name",
LabelType::Protocol(_) => "protocol",
LabelType::Direction(_) => "direction",
LabelType::CompressionAlgo(_) => "compression_algo",
LabelType::ErrorType(_) => "error_type",
LabelType::Status(_) => "status",
}
}
pub fn value(&self) -> String {
match self {
LabelType::NetworkName(name) => name.clone(),
LabelType::SrcPeerId(id) => id.to_string(),
LabelType::DstPeerId(id) => id.to_string(),
LabelType::ServiceName(name) => name.clone(),
LabelType::MethodName(name) => name.clone(),
LabelType::Protocol(proto) => proto.clone(),
LabelType::Direction(dir) => dir.clone(),
LabelType::CompressionAlgo(algo) => algo.clone(),
LabelType::ErrorType(err) => err.clone(),
LabelType::Status(status) => status.clone(),
}
}
}
/// Label represents a key-value pair for metric identification
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct Label {
pub key: String,
pub value: String,
}
impl Label {
pub fn new(key: impl Into<String>, value: impl Into<String>) -> Self {
Self {
key: key.into(),
value: value.into(),
}
}
pub fn from_label_type(label_type: &LabelType) -> Self {
Self {
key: label_type.key().to_string(),
value: label_type.value(),
}
}
}
/// LabelSet represents a collection of labels for a metric
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct LabelSet {
labels: Vec<Label>,
}
impl LabelSet {
pub fn new() -> Self {
Self { labels: Vec::new() }
}
pub fn with_label(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.labels.push(Label::new(key, value));
self.labels.sort_by(|a, b| a.key.cmp(&b.key)); // Keep labels sorted for consistent hashing
self
}
/// Add a typed label to the set
pub fn with_label_type(mut self, label_type: LabelType) -> Self {
self.labels.push(Label::from_label_type(&label_type));
self.labels.sort_by(|a, b| a.key.cmp(&b.key)); // Keep labels sorted for consistent hashing
self
}
/// Create a LabelSet from multiple LabelTypes
pub fn from_label_types(label_types: &[LabelType]) -> Self {
let mut labels = Vec::new();
for label_type in label_types {
labels.push(Label::from_label_type(label_type));
}
labels.sort_by(|a, b| a.key.cmp(&b.key)); // Keep labels sorted for consistent hashing
Self { labels }
}
pub fn labels(&self) -> &[Label] {
&self.labels
}
/// Generate a string key for this label set
pub fn to_key(&self) -> String {
if self.labels.is_empty() {
return String::new();
}
let mut parts = Vec::with_capacity(self.labels.len());
for label in &self.labels {
parts.push(format!("{}={}", label.key, label.value));
}
parts.join(",")
}
}
impl Default for LabelSet {
fn default() -> Self {
Self::new()
}
}
/// UnsafeCounter provides a high-performance counter using UnsafeCell
#[derive(Debug)]
pub struct UnsafeCounter {
value: UnsafeCell<u64>,
}
impl UnsafeCounter {
pub fn new() -> Self {
Self {
value: UnsafeCell::new(0),
}
}
pub fn new_with_value(initial: u64) -> Self {
Self {
value: UnsafeCell::new(initial),
}
}
/// Increment the counter by the given amount
/// # Safety
/// This method is unsafe because it uses UnsafeCell. The caller must ensure
/// that no other thread is accessing this counter simultaneously.
pub unsafe fn add(&self, delta: u64) {
let ptr = self.value.get();
*ptr = (*ptr).saturating_add(delta);
}
/// Increment the counter by 1
/// # Safety
/// This method is unsafe because it uses UnsafeCell. The caller must ensure
/// that no other thread is accessing this counter simultaneously.
pub unsafe fn inc(&self) {
self.add(1);
}
/// Get the current value of the counter
/// # Safety
/// This method is unsafe because it uses UnsafeCell. The caller must ensure
/// that no other thread is modifying this counter simultaneously.
pub unsafe fn get(&self) -> u64 {
let ptr = self.value.get();
*ptr
}
/// Reset the counter to zero
/// # Safety
/// This method is unsafe because it uses UnsafeCell. The caller must ensure
/// that no other thread is accessing this counter simultaneously.
pub unsafe fn reset(&self) {
let ptr = self.value.get();
*ptr = 0;
}
/// Set the counter to a specific value
/// # Safety
/// This method is unsafe because it uses UnsafeCell. The caller must ensure
/// that no other thread is accessing this counter simultaneously.
pub unsafe fn set(&self, value: u64) {
let ptr = self.value.get();
*ptr = value;
}
}
// UnsafeCounter is Send + Sync because the safety is guaranteed by the caller
unsafe impl Send for UnsafeCounter {}
unsafe impl Sync for UnsafeCounter {}
/// MetricData contains both the counter and last update timestamp
/// Uses UnsafeCell for lock-free access
#[derive(Debug)]
struct MetricData {
counter: UnsafeCounter,
last_updated: UnsafeCell<Instant>,
}
impl MetricData {
fn new() -> Self {
Self {
counter: UnsafeCounter::new(),
last_updated: UnsafeCell::new(Instant::now()),
}
}
fn new_with_value(initial: u64) -> Self {
Self {
counter: UnsafeCounter::new_with_value(initial),
last_updated: UnsafeCell::new(Instant::now()),
}
}
/// Update the last_updated timestamp
/// # Safety
/// This method is unsafe because it uses UnsafeCell. The caller must ensure
/// that no other thread is accessing this timestamp simultaneously.
unsafe fn touch(&self) {
let ptr = self.last_updated.get();
*ptr = Instant::now();
}
/// Get the last updated timestamp
/// # Safety
/// This method is unsafe because it uses UnsafeCell. The caller must ensure
/// that no other thread is modifying this timestamp simultaneously.
unsafe fn get_last_updated(&self) -> Instant {
let ptr = self.last_updated.get();
*ptr
}
}
// MetricData is Send + Sync because the safety is guaranteed by the caller
unsafe impl Send for MetricData {}
unsafe impl Sync for MetricData {}
/// MetricKey uniquely identifies a metric with its name and labels
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct MetricKey {
name: MetricName,
labels: LabelSet,
}
impl MetricKey {
fn new(name: MetricName, labels: LabelSet) -> Self {
Self { name, labels }
}
/// Generate a string representation for this metric key
fn to_string(&self) -> String {
let label_str = self.labels.to_key();
if label_str.is_empty() {
self.name.to_string()
} else {
format!("{}[{}]", self.name, label_str)
}
}
}
/// CounterHandle provides a safe interface to a MetricData
/// It ensures thread-local access patterns for performance
#[derive(Clone)]
pub struct CounterHandle {
metric_data: Arc<MetricData>,
_key: MetricKey, // Keep key for debugging purposes
}
impl CounterHandle {
fn new(metric_data: Arc<MetricData>, key: MetricKey) -> Self {
Self {
metric_data,
_key: key,
}
}
/// Increment the counter by the given amount
pub fn add(&self, delta: u64) {
unsafe {
self.metric_data.counter.add(delta);
self.metric_data.touch();
}
}
/// Increment the counter by 1
pub fn inc(&self) {
unsafe {
self.metric_data.counter.inc();
self.metric_data.touch();
}
}
/// Get the current value of the counter
pub fn get(&self) -> u64 {
unsafe { self.metric_data.counter.get() }
}
/// Reset the counter to zero
pub fn reset(&self) {
unsafe {
self.metric_data.counter.reset();
self.metric_data.touch();
}
}
/// Set the counter to a specific value
pub fn set(&self, value: u64) {
unsafe {
self.metric_data.counter.set(value);
self.metric_data.touch();
}
}
}
/// MetricSnapshot represents a point-in-time view of a metric
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MetricSnapshot {
pub name: MetricName,
pub labels: LabelSet,
pub value: u64,
}
impl MetricSnapshot {
pub fn name_str(&self) -> String {
self.name.to_string()
}
}
/// StatsManager manages global statistics with high performance counters
pub struct StatsManager {
counters: Arc<DashMap<MetricKey, Arc<MetricData>>>,
cleanup_task: ScopedTask<()>,
}
impl StatsManager {
/// Create a new StatsManager
pub fn new() -> Self {
let counters = Arc::new(DashMap::new());
// Start cleanup task only if we're in a tokio runtime
let counters_clone = Arc::downgrade(&counters.clone());
let cleanup_task = tokio::spawn(async move {
let mut interval = interval(Duration::from_secs(60)); // Check every minute
loop {
interval.tick().await;
let cutoff_time = Instant::now() - Duration::from_secs(180); // 3 minutes
let Some(counters) = counters_clone.upgrade() else {
break;
};
// Remove entries that haven't been updated for 3 minutes
counters.retain(|_, metric_data: &mut Arc<MetricData>| unsafe {
metric_data.get_last_updated() > cutoff_time
});
}
});
Self {
counters,
cleanup_task: cleanup_task.into(),
}
}
/// Get or create a counter with the given name and labels
pub fn get_counter(&self, name: MetricName, labels: LabelSet) -> CounterHandle {
let key = MetricKey::new(name, labels);
let metric_data = self
.counters
.entry(key.clone())
.or_insert_with(|| Arc::new(MetricData::new()))
.clone();
CounterHandle::new(metric_data, key)
}
/// Get a counter with no labels
pub fn get_simple_counter(&self, name: MetricName) -> CounterHandle {
self.get_counter(name, LabelSet::new())
}
/// Get all metric snapshots
pub fn get_all_metrics(&self) -> Vec<MetricSnapshot> {
let mut metrics = Vec::new();
for entry in self.counters.iter() {
let key = entry.key();
let metric_data = entry.value();
let value = unsafe { metric_data.counter.get() };
metrics.push(MetricSnapshot {
name: key.name,
labels: key.labels.clone(),
value,
});
}
// Sort by metric name and then by labels for consistent output
metrics.sort_by(|a, b| {
a.name
.to_string()
.cmp(&b.name.to_string())
.then_with(|| a.labels.to_key().cmp(&b.labels.to_key()))
});
metrics
}
/// Get metrics filtered by name prefix
pub fn get_metrics_by_prefix(&self, prefix: &str) -> Vec<MetricSnapshot> {
self.get_all_metrics()
.into_iter()
.filter(|m| m.name.to_string().starts_with(prefix))
.collect()
}
/// Get a specific metric by name and labels
pub fn get_metric(&self, name: MetricName, labels: &LabelSet) -> Option<MetricSnapshot> {
let key = MetricKey::new(name, labels.clone());
if let Some(metric_data) = self.counters.get(&key) {
let value = unsafe { metric_data.counter.get() };
Some(MetricSnapshot {
name,
labels: labels.clone(),
value,
})
} else {
None
}
}
/// Clear all metrics
pub fn clear(&self) {
self.counters.clear();
}
/// Get the number of tracked metrics
pub fn metric_count(&self) -> usize {
self.counters.len()
}
/// Export metrics in Prometheus format
pub fn export_prometheus(&self) -> String {
let metrics = self.get_all_metrics();
let mut output = String::new();
let mut current_metric = String::new();
for metric in metrics {
let metric_name_str = metric.name.to_string();
if metric_name_str != current_metric {
if !current_metric.is_empty() {
output.push('\n');
}
output.push_str(&format!("# TYPE {} counter\n", metric_name_str));
current_metric = metric_name_str.clone();
}
if metric.labels.labels().is_empty() {
output.push_str(&format!("{} {}\n", metric_name_str, metric.value));
} else {
let label_str = metric
.labels
.labels()
.iter()
.map(|l| format!("{}=\"{}\"", l.key, l.value))
.collect::<Vec<_>>()
.join(",");
output.push_str(&format!(
"{}{{{}}} {}\n",
metric_name_str, label_str, metric.value
));
}
}
output
}
}
impl Default for StatsManager {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::common::stats_manager::{LabelSet, LabelType, MetricName, StatsManager};
use crate::proto::cli::{
GetPrometheusStatsRequest, GetPrometheusStatsResponse, GetStatsRequest, GetStatsResponse,
};
use std::collections::BTreeMap;
#[tokio::test]
async fn test_label_set() {
let labels = LabelSet::new()
.with_label("peer_id", "peer1")
.with_label("method", "ping");
assert_eq!(labels.to_key(), "method=ping,peer_id=peer1");
}
#[tokio::test]
async fn test_unsafe_counter() {
let counter = UnsafeCounter::new();
unsafe {
assert_eq!(counter.get(), 0);
counter.inc();
assert_eq!(counter.get(), 1);
counter.add(5);
assert_eq!(counter.get(), 6);
counter.set(10);
assert_eq!(counter.get(), 10);
counter.reset();
assert_eq!(counter.get(), 0);
}
}
#[tokio::test]
async fn test_stats_manager() {
let stats = StatsManager::new();
// Test simple counter
let counter1 = stats.get_simple_counter(MetricName::PeerRpcClientTx);
counter1.inc();
counter1.add(5);
// Test counter with labels
let labels = LabelSet::new()
.with_label("peer_id", "peer1")
.with_label("method", "ping");
let counter2 = stats.get_counter(MetricName::PeerRpcClientTx, labels.clone());
counter2.add(3);
// Check metrics
let metrics = stats.get_all_metrics();
assert_eq!(metrics.len(), 2);
// Find the simple counter
let simple_metric = metrics
.iter()
.find(|m| m.labels.labels().is_empty())
.unwrap();
assert_eq!(simple_metric.name, MetricName::PeerRpcClientTx);
assert_eq!(simple_metric.value, 6);
// Find the labeled counter
let labeled_metric = metrics
.iter()
.find(|m| !m.labels.labels().is_empty())
.unwrap();
assert_eq!(labeled_metric.name, MetricName::PeerRpcClientTx);
assert_eq!(labeled_metric.value, 3);
assert_eq!(labeled_metric.labels, labels);
}
#[tokio::test]
async fn test_prometheus_export() {
let stats = StatsManager::new();
let counter1 = stats.get_simple_counter(MetricName::TrafficBytesTx);
counter1.set(100);
let labels = LabelSet::new().with_label("status", "success");
let counter2 = stats.get_counter(MetricName::PeerRpcClientTx, labels);
counter2.set(50);
let prometheus_output = stats.export_prometheus();
assert!(prometheus_output.contains("# TYPE peer_rpc_client_tx counter"));
assert!(prometheus_output.contains("peer_rpc_client_tx{status=\"success\"} 50"));
assert!(prometheus_output.contains("# TYPE traffic_bytes_tx counter"));
assert!(prometheus_output.contains("traffic_bytes_tx 100"));
}
#[tokio::test]
async fn test_get_metric() {
let stats = StatsManager::new();
let labels = LabelSet::new().with_label("peer", "test");
let counter = stats.get_counter(MetricName::PeerRpcClientTx, labels.clone());
counter.set(42);
let metric = stats
.get_metric(MetricName::PeerRpcClientTx, &labels)
.unwrap();
assert_eq!(metric.value, 42);
let non_existent = stats.get_metric(MetricName::PeerRpcErrors, &LabelSet::new());
assert!(non_existent.is_none());
}
#[tokio::test]
async fn test_metrics_by_prefix() {
let stats = StatsManager::new();
stats
.get_simple_counter(MetricName::PeerRpcClientTx)
.set(10);
stats.get_simple_counter(MetricName::PeerRpcErrors).set(2);
stats
.get_simple_counter(MetricName::TrafficBytesTx)
.set(100);
let rpc_metrics = stats.get_metrics_by_prefix("peer_rpc");
assert_eq!(rpc_metrics.len(), 2);
let traffic_metrics = stats.get_metrics_by_prefix("traffic_");
assert_eq!(traffic_metrics.len(), 1);
}
#[tokio::test]
async fn test_cleanup_mechanism() {
let stats = StatsManager::new();
// 创建一些计数器
let counter1 = stats.get_simple_counter(MetricName::PeerRpcClientTx);
counter1.set(10);
let labels = LabelSet::new().with_label("test", "value");
let counter2 = stats.get_counter(MetricName::TrafficBytesTx, labels);
counter2.set(20);
// 验证计数器存在
assert_eq!(stats.metric_count(), 2);
// 注意实际的清理测试需要等待3分钟这在单元测试中不现实
// 这里我们只验证清理机制的基本结构是否正确
// 清理逻辑在后台线程中运行会自动删除超过3分钟未更新的条目
// 验证计数器仍然可以正常工作
counter1.inc();
assert_eq!(counter1.get(), 11);
counter2.add(5);
assert_eq!(counter2.get(), 25);
}
#[tokio::test]
async fn test_stats_rpc_data_structures() {
// Test GetStatsRequest
let request = GetStatsRequest {};
assert_eq!(request, GetStatsRequest {});
// Test GetStatsResponse
let response = GetStatsResponse { metrics: vec![] };
assert!(response.metrics.is_empty());
// Test GetPrometheusStatsRequest
let prometheus_request = GetPrometheusStatsRequest {};
assert_eq!(prometheus_request, GetPrometheusStatsRequest {});
// Test GetPrometheusStatsResponse
let prometheus_response = GetPrometheusStatsResponse {
prometheus_text: "# Test metrics\n".to_string(),
};
assert_eq!(prometheus_response.prometheus_text, "# Test metrics\n");
}
#[tokio::test]
async fn test_metric_snapshot_creation() {
let stats_manager = StatsManager::new();
// Create some test metrics
let counter1 = stats_manager.get_counter(
MetricName::PeerRpcClientTx,
LabelSet::new()
.with_label_type(LabelType::SrcPeerId(123))
.with_label_type(LabelType::ServiceName("test_service".to_string())),
);
counter1.add(100);
let counter2 = stats_manager.get_counter(
MetricName::TrafficBytesTx,
LabelSet::new().with_label_type(LabelType::Protocol("tcp".to_string())),
);
counter2.add(1024);
// Get all metrics
let metrics = stats_manager.get_all_metrics();
assert_eq!(metrics.len(), 2);
// Verify the metrics can be converted to the format expected by RPC
for metric in metrics {
let mut labels = BTreeMap::new();
for label in metric.labels.labels() {
labels.insert(label.key.clone(), label.value.clone());
}
// This simulates what the RPC service would do
let _metric_snapshot = crate::proto::cli::MetricSnapshot {
name: metric.name.to_string(),
value: metric.value,
labels,
};
}
}
#[tokio::test]
async fn test_prometheus_export_format() {
let stats_manager = StatsManager::new();
// Create test metrics
let counter = stats_manager.get_counter(
MetricName::PeerRpcClientTx,
LabelSet::new()
.with_label_type(LabelType::SrcPeerId(123))
.with_label_type(LabelType::ServiceName("test".to_string())),
);
counter.add(42);
// Export to Prometheus format
let prometheus_text = stats_manager.export_prometheus();
println!("{}", prometheus_text);
// Verify the format
assert!(prometheus_text.contains("peer_rpc_client_tx"));
assert!(prometheus_text.contains("42"));
assert!(prometheus_text.contains("src_peer_id=\"123\""));
assert!(prometheus_text.contains("service_name=\"test\""));
}
}

View File

@@ -30,13 +30,13 @@ use easytier::{
cli::{
list_peer_route_pair, AclManageRpc, AclManageRpcClientFactory, AddPortForwardRequest,
ConnectorManageRpc, ConnectorManageRpcClientFactory, DumpRouteRequest,
GetAclStatsRequest, GetVpnPortalInfoRequest, GetWhitelistRequest, ListConnectorRequest,
GetAclStatsRequest, GetPrometheusStatsRequest, GetStatsRequest, GetVpnPortalInfoRequest, GetWhitelistRequest, ListConnectorRequest,
ListForeignNetworkRequest, ListGlobalForeignNetworkRequest, ListMappedListenerRequest,
ListPeerRequest, ListPeerResponse, ListPortForwardRequest, ListRouteRequest,
ListRouteResponse, ManageMappedListenerRequest, MappedListenerManageAction,
MappedListenerManageRpc, MappedListenerManageRpcClientFactory, NodeInfo, PeerManageRpc,
PeerManageRpcClientFactory, PortForwardManageRpc, PortForwardManageRpcClientFactory,
RemovePortForwardRequest, SetWhitelistRequest, ShowNodeInfoRequest, TcpProxyEntryState,
RemovePortForwardRequest, SetWhitelistRequest, ShowNodeInfoRequest, StatsRpc, StatsRpcClientFactory, TcpProxyEntryState,
TcpProxyEntryTransportType, TcpProxyRpc, TcpProxyRpcClientFactory, VpnPortalRpc,
VpnPortalRpcClientFactory,
},
@@ -102,6 +102,8 @@ enum SubCommand {
PortForward(PortForwardArgs),
#[command(about = "manage TCP/UDP whitelist")]
Whitelist(WhitelistArgs),
#[command(about = "show statistics information")]
Stats(StatsArgs),
#[command(about = t!("core_clap.generate_completions").to_string())]
GenAutocomplete { shell: Shell },
}
@@ -255,6 +257,20 @@ enum WhitelistSubCommand {
Show,
}
#[derive(Args, Debug)]
struct StatsArgs {
#[command(subcommand)]
sub_command: Option<StatsSubCommand>,
}
#[derive(Subcommand, Debug)]
enum StatsSubCommand {
/// Show general statistics
Show,
/// Show statistics in Prometheus format
Prometheus,
}
#[derive(Args, Debug)]
struct ServiceArgs {
#[arg(short, long, default_value = env!("CARGO_PKG_NAME"), help = "service name")]
@@ -414,6 +430,18 @@ impl CommandHandler<'_> {
.with_context(|| "failed to get port forward manager client")?)
}
async fn get_stats_client(
&self,
) -> Result<Box<dyn StatsRpc<Controller = BaseController>>, Error> {
Ok(self
.client
.lock()
.unwrap()
.scoped_client::<StatsRpcClientFactory<BaseController>>("".to_string())
.await
.with_context(|| "failed to get stats client")?)
}
async fn list_peers(&self) -> Result<ListPeerResponse, Error> {
let client = self.get_peer_manager_client().await?;
let request = ListPeerRequest::default();
@@ -1879,6 +1907,71 @@ async fn main() -> Result<(), Error> {
handler.handle_whitelist_show().await?;
}
},
SubCommand::Stats(stats_args) => match &stats_args.sub_command {
Some(StatsSubCommand::Show) | None => {
let client = handler.get_stats_client().await?;
let request = GetStatsRequest {};
let response = client
.get_stats(BaseController::default(), request)
.await?;
if cli.output_format == OutputFormat::Json {
println!("{}", serde_json::to_string_pretty(&response.metrics)?);
} else {
#[derive(tabled::Tabled, serde::Serialize)]
struct StatsTableRow {
#[tabled(rename = "Metric Name")]
name: String,
#[tabled(rename = "Value")]
value: String,
#[tabled(rename = "Labels")]
labels: String,
}
let table_rows: Vec<StatsTableRow> = response
.metrics
.iter()
.map(|metric| {
let labels_str = if metric.labels.is_empty() {
"-".to_string()
} else {
metric
.labels
.iter()
.map(|(k, v)| format!("{}={}", k, v))
.collect::<Vec<_>>()
.join(", ")
};
let formatted_value = if metric.name.contains("bytes") {
format_size(metric.value, humansize::BINARY)
} else if metric.name.contains("duration") {
format!("{} ms", metric.value)
} else {
metric.value.to_string()
};
StatsTableRow {
name: metric.name.clone(),
value: formatted_value,
labels: labels_str,
}
})
.collect();
print_output(&table_rows, &cli.output_format)?
}
}
Some(StatsSubCommand::Prometheus) => {
let client = handler.get_stats_client().await?;
let request = GetPrometheusStatsRequest {};
let response = client
.get_prometheus_stats(BaseController::default(), request)
.await?;
println!("{}", response.prometheus_text);
}
},
SubCommand::GenAutocomplete { shell } => {
let mut cmd = Cli::command();
easytier::print_completions(shell, &mut cmd, "easytier-cli");

View File

@@ -31,11 +31,12 @@ use crate::peers::rpc_service::PeerManagerRpcService;
use crate::peers::{create_packet_recv_chan, recv_packet_from_chan, PacketRecvChanReceiver};
use crate::proto::cli::VpnPortalRpc;
use crate::proto::cli::{
AddPortForwardRequest, AddPortForwardResponse, ListMappedListenerRequest,
AddPortForwardRequest, AddPortForwardResponse, GetPrometheusStatsRequest,
GetPrometheusStatsResponse, GetStatsRequest, GetStatsResponse, ListMappedListenerRequest,
ListMappedListenerResponse, ListPortForwardRequest, ListPortForwardResponse,
ManageMappedListenerRequest, ManageMappedListenerResponse, MappedListener,
MappedListenerManageAction, MappedListenerManageRpc, PortForwardManageRpc,
RemovePortForwardRequest, RemovePortForwardResponse,
MappedListenerManageAction, MappedListenerManageRpc, MetricSnapshot, PortForwardManageRpc,
RemovePortForwardRequest, RemovePortForwardResponse, StatsRpc,
};
use crate::proto::cli::{GetVpnPortalInfoRequest, GetVpnPortalInfoResponse, VpnPortalInfo};
use crate::proto::common::{PortForwardConfigPb, TunnelInfo};
@@ -872,6 +873,60 @@ impl Instance {
}
}
fn get_stats_rpc_service(&self) -> impl StatsRpc<Controller = BaseController> + Clone {
#[derive(Clone)]
pub struct StatsRpcService {
global_ctx: ArcGlobalCtx,
}
#[async_trait::async_trait]
impl StatsRpc for StatsRpcService {
type Controller = BaseController;
async fn get_stats(
&self,
_: BaseController,
_request: GetStatsRequest,
) -> Result<GetStatsResponse, rpc_types::error::Error> {
let stats_manager = self.global_ctx.stats_manager();
let snapshots = stats_manager.get_all_metrics();
let metrics = snapshots
.into_iter()
.map(|snapshot| {
let mut labels = std::collections::BTreeMap::new();
for label in snapshot.labels.labels() {
labels.insert(label.key.clone(), label.value.clone());
}
MetricSnapshot {
name: snapshot.name_str(),
value: snapshot.value,
labels,
}
})
.collect();
Ok(GetStatsResponse { metrics })
}
async fn get_prometheus_stats(
&self,
_: BaseController,
_request: GetPrometheusStatsRequest,
) -> Result<GetPrometheusStatsResponse, rpc_types::error::Error> {
let stats_manager = self.global_ctx.stats_manager();
let prometheus_text = stats_manager.export_prometheus();
Ok(GetPrometheusStatsResponse { prometheus_text })
}
}
StatsRpcService {
global_ctx: self.global_ctx.clone(),
}
}
async fn run_rpc_server(&mut self) -> Result<(), Error> {
let Some(_) = self.global_ctx.config.get_rpc_portal() else {
tracing::info!("rpc server not enabled, because rpc_portal is not set.");
@@ -886,6 +941,7 @@ impl Instance {
let vpn_portal_rpc = self.get_vpn_portal_rpc_service();
let mapped_listener_manager_rpc = self.get_mapped_listener_manager_rpc_service();
let port_forward_manager_rpc = self.get_port_forward_manager_rpc_service();
let stats_rpc_service = self.get_stats_rpc_service();
let s = self.rpc_server.as_mut().unwrap();
let peer_mgr_rpc_service = PeerManagerRpcService::new(peer_mgr.clone());
@@ -910,6 +966,10 @@ impl Instance {
PortForwardManageRpcServer::new(port_forward_manager_rpc),
"",
);
s.registry().register(
crate::proto::cli::StatsRpcServer::new(stats_rpc_service),
"",
);
if let Some(ip_proxy) = self.ip_proxy.as_ref() {
s.registry().register(

View File

@@ -25,6 +25,7 @@ use crate::{
error::Error,
global_ctx::{ArcGlobalCtx, GlobalCtx, GlobalCtxEvent, NetworkIdentity},
join_joinset_background,
stats_manager::{LabelSet, LabelType, MetricName, StatsManager},
token_bucket::TokenBucket,
PeerId,
},
@@ -75,6 +76,8 @@ struct ForeignNetworkEntry {
peer_center: Arc<PeerCenterInstance>,
stats_mgr: Arc<StatsManager>,
tasks: Mutex<JoinSet<()>>,
pub lock: Mutex<()>,
@@ -89,6 +92,7 @@ impl ForeignNetworkEntry {
relay_data: bool,
pm_packet_sender: PacketRecvChan,
) -> Self {
let stats_mgr = global_ctx.stats_manager().clone();
let foreign_global_ctx = Self::build_foreign_global_ctx(&network, global_ctx.clone());
let (packet_sender, packet_recv) = create_packet_recv_chan();
@@ -141,6 +145,8 @@ impl ForeignNetworkEntry {
bps_limiter,
stats_mgr,
tasks: Mutex::new(JoinSet::new()),
peer_center,
@@ -297,8 +303,24 @@ impl ForeignNetworkEntry {
let network_name = self.network.network_name.clone();
let bps_limiter = self.bps_limiter.clone();
let label_set =
LabelSet::new().with_label_type(LabelType::NetworkName(network_name.clone()));
let forward_bytes = self
.stats_mgr
.get_counter(MetricName::TrafficBytesForwarded, label_set.clone());
let forward_packets = self
.stats_mgr
.get_counter(MetricName::TrafficPacketsForwarded, label_set.clone());
let rx_bytes = self
.stats_mgr
.get_counter(MetricName::TrafficBytesSelfRx, label_set.clone());
let rx_packets = self
.stats_mgr
.get_counter(MetricName::TrafficPacketsRx, label_set.clone());
self.tasks.lock().await.spawn(async move {
while let Ok(zc_packet) = recv_packet_from_chan(&mut recv).await {
let buf_len = zc_packet.buf_len();
let Some(hdr) = zc_packet.peer_manager_header() else {
tracing::warn!("invalid packet, skip");
continue;
@@ -310,6 +332,8 @@ impl ForeignNetworkEntry {
|| hdr.packet_type == PacketType::RpcReq as u8
|| hdr.packet_type == PacketType::RpcResp as u8
{
rx_bytes.add(buf_len as u64);
rx_packets.inc();
rpc_sender.send(zc_packet).unwrap();
continue;
}
@@ -327,6 +351,9 @@ impl ForeignNetworkEntry {
}
}
forward_bytes.add(buf_len as u64);
forward_packets.inc();
let gateway_peer_id = peer_map
.get_gateway_peer_id(to_peer_id, NextHopPolicy::LeastHop)
.await;

View File

@@ -8,6 +8,7 @@ use std::{
},
};
use arc_swap::ArcSwapOption;
use futures::{StreamExt, TryFutureExt};
use prost::Message;
@@ -27,6 +28,7 @@ use crate::{
defer,
error::Error,
global_ctx::ArcGlobalCtx,
stats_manager::{CounterHandle, LabelSet, LabelType, MetricName},
PeerId,
},
proto::{
@@ -85,6 +87,13 @@ impl PeerConnCloseNotify {
}
}
struct PeerConnCounter {
traffic_tx_bytes: CounterHandle,
traffic_rx_bytes: CounterHandle,
traffic_tx_packets: CounterHandle,
traffic_rx_packets: CounterHandle,
}
pub struct PeerConn {
conn_id: PeerConnId,
@@ -111,6 +120,8 @@ pub struct PeerConn {
latency_stats: Arc<WindowLatency>,
throughput: Arc<Throughput>,
loss_rate_stats: Arc<AtomicU32>,
counters: ArcSwapOption<PeerConnCounter>,
}
impl Debug for PeerConn {
@@ -164,6 +175,8 @@ impl PeerConn {
latency_stats: Arc::new(WindowLatency::new(15)),
throughput,
loss_rate_stats: Arc::new(AtomicU32::new(0)),
counters: ArcSwapOption::new(None),
}
}
@@ -362,6 +375,22 @@ impl PeerConn {
let ctrl_sender = self.ctrl_resp_sender.clone();
let conn_info_for_instrument = self.get_conn_info();
let stats_mgr = self.global_ctx.stats_manager();
let label_set = LabelSet::new().with_label_type(LabelType::NetworkName(
conn_info_for_instrument.network_name.clone(),
));
let counters = PeerConnCounter {
traffic_tx_bytes: stats_mgr.get_counter(MetricName::TrafficBytesTx, label_set.clone()),
traffic_rx_bytes: stats_mgr.get_counter(MetricName::TrafficBytesRx, label_set.clone()),
traffic_tx_packets: stats_mgr
.get_counter(MetricName::TrafficPacketsTx, label_set.clone()),
traffic_rx_packets: stats_mgr
.get_counter(MetricName::TrafficPacketsRx, label_set.clone()),
};
self.counters.store(Some(Arc::new(counters)));
let counters = self.counters.load_full().unwrap();
self.tasks.spawn(
async move {
tracing::info!("start recving peer conn packet");
@@ -374,6 +403,10 @@ impl PeerConn {
}
let mut zc_packet = ret.unwrap();
counters.traffic_rx_bytes.add(zc_packet.buf_len() as u64);
counters.traffic_rx_packets.inc();
let Some(peer_mgr_hdr) = zc_packet.mut_peer_manager_header() else {
tracing::error!(
"unexpected packet: {:?}, cannot decode peer manager hdr",
@@ -436,6 +469,11 @@ impl PeerConn {
}
pub async fn send_msg(&self, msg: ZCPacket) -> Result<(), Error> {
let counters = self.counters.load();
if let Some(ref counters) = *counters {
counters.traffic_tx_bytes.add(msg.buf_len() as u64);
counters.traffic_tx_packets.inc();
}
Ok(self.sink.send(msg).await?)
}

View File

@@ -24,6 +24,7 @@ use crate::{
constants::EASYTIER_VERSION,
error::Error,
global_ctx::{ArcGlobalCtx, NetworkIdentity},
stats_manager::{CounterHandle, LabelSet, LabelType, MetricName},
stun::StunInfoCollectorTrait,
PeerId,
},
@@ -116,6 +117,13 @@ enum RouteAlgoInst {
None,
}
struct SelfTxCounters {
self_tx_packets: CounterHandle,
self_tx_bytes: CounterHandle,
compress_tx_bytes_before: CounterHandle,
compress_tx_bytes_after: CounterHandle,
}
pub struct PeerManager {
my_peer_id: PeerId,
@@ -147,6 +155,8 @@ pub struct PeerManager {
reserved_my_peer_id_map: DashMap<String, PeerId>,
allow_loopback_tunnel: AtomicBool,
self_tx_counters: SelfTxCounters,
}
impl Debug for PeerManager {
@@ -214,7 +224,10 @@ impl PeerManager {
peer_rpc_tspt_sender,
encryptor: encryptor.clone(),
});
let peer_rpc_mgr = Arc::new(PeerRpcManager::new(rpc_tspt.clone()));
let peer_rpc_mgr = Arc::new(PeerRpcManager::new_with_stats_manager(
rpc_tspt.clone(),
global_ctx.stats_manager().clone(),
));
let route_algo_inst = match route_algo {
RouteAlgoType::Ospf => RouteAlgoInst::Ospf(PeerRoute::new(
@@ -246,6 +259,30 @@ impl PeerManager {
let exit_nodes = global_ctx.config.get_exit_nodes();
let stats_manager = global_ctx.stats_manager();
let self_tx_counters = SelfTxCounters {
self_tx_packets: stats_manager.get_counter(
MetricName::TrafficPacketsSelfTx,
LabelSet::new()
.with_label_type(LabelType::NetworkName(global_ctx.get_network_name())),
),
self_tx_bytes: stats_manager.get_counter(
MetricName::TrafficBytesSelfTx,
LabelSet::new()
.with_label_type(LabelType::NetworkName(global_ctx.get_network_name())),
),
compress_tx_bytes_before: stats_manager.get_counter(
MetricName::CompressionBytesTxBefore,
LabelSet::new()
.with_label_type(LabelType::NetworkName(global_ctx.get_network_name())),
),
compress_tx_bytes_after: stats_manager.get_counter(
MetricName::CompressionBytesTxAfter,
LabelSet::new()
.with_label_type(LabelType::NetworkName(global_ctx.get_network_name())),
),
};
PeerManager {
my_peer_id,
@@ -277,6 +314,8 @@ impl PeerManager {
reserved_my_peer_id_map: DashMap::new(),
allow_loopback_tunnel: AtomicBool::new(true),
self_tx_counters,
}
}
@@ -507,9 +546,24 @@ impl PeerManager {
let foreign_network_my_peer_id =
foreign_network_mgr.get_network_peer_id(&foreign_network_name);
let buf_len = packet.buf_len();
let stats_manager = peer_map.get_global_ctx().stats_manager().clone();
let label_set =
LabelSet::new().with_label_type(LabelType::NetworkName(foreign_network_name.clone()));
let add_counter = move |bytes_metric, packets_metric| {
stats_manager
.get_counter(bytes_metric, label_set.clone())
.add(buf_len as u64);
stats_manager.get_counter(packets_metric, label_set).inc();
};
// NOTICE: the to peer id is modified by the src from foreign network my peer id to the origin my peer id
if to_peer_id == my_peer_id {
// packet sent from other peer to me, extract the inner packet and forward it
add_counter(
MetricName::TrafficBytesForeignForwardRx,
MetricName::TrafficPacketsForeignForwardRx,
);
if let Err(e) = foreign_network_mgr
.send_msg_to_peer(
&foreign_network_name,
@@ -540,6 +594,11 @@ impl PeerManager {
return Err(packet);
};
add_counter(
MetricName::TrafficBytesForeignForwardTx,
MetricName::TrafficPacketsForeignForwardTx,
);
// modify the to_peer id from foreign network my peer id to the origin my peer id
packet
.mut_peer_manager_header()
@@ -558,10 +617,13 @@ impl PeerManager {
"send_msg_directly failed when forward local generated foreign network packet"
);
}
Ok(())
} else {
// target is not me, forward it. try get origin peer id
add_counter(
MetricName::TrafficBytesForeignForwardForwarded,
MetricName::TrafficPacketsForeignForwardForwarded,
);
Err(packet)
}
}
@@ -577,6 +639,29 @@ impl PeerManager {
let compress_algo = self.data_compress_algo;
let acl_filter = self.global_ctx.get_acl_filter().clone();
let global_ctx = self.global_ctx.clone();
let stats_mgr = self.global_ctx.stats_manager().clone();
let label_set =
LabelSet::new().with_label_type(LabelType::NetworkName(global_ctx.get_network_name()));
let self_tx_bytes = self.self_tx_counters.self_tx_bytes.clone();
let self_tx_packets = self.self_tx_counters.self_tx_packets.clone();
let self_rx_bytes =
stats_mgr.get_counter(MetricName::TrafficBytesSelfRx, label_set.clone());
let self_rx_packets =
stats_mgr.get_counter(MetricName::TrafficPacketsSelfRx, label_set.clone());
let forward_tx_bytes =
stats_mgr.get_counter(MetricName::TrafficBytesForwarded, label_set.clone());
let forward_tx_packets =
stats_mgr.get_counter(MetricName::TrafficPacketsForwarded, label_set.clone());
let compress_tx_bytes_before = self.self_tx_counters.compress_tx_bytes_before.clone();
let compress_tx_bytes_after = self.self_tx_counters.compress_tx_bytes_after.clone();
let compress_rx_bytes_before =
stats_mgr.get_counter(MetricName::CompressionBytesRxBefore, label_set.clone());
let compress_rx_bytes_after =
stats_mgr.get_counter(MetricName::CompressionBytesRxAfter, label_set.clone());
self.tasks.lock().await.spawn(async move {
tracing::trace!("start_peer_recv");
while let Ok(ret) = recv_packet_from_chan(&mut recv).await {
@@ -587,6 +672,7 @@ impl PeerManager {
continue;
};
let buf_len = ret.buf_len();
let Some(hdr) = ret.mut_peer_manager_header() else {
tracing::warn!(?ret, "invalid packet, skip");
continue;
@@ -608,13 +694,24 @@ impl PeerManager {
hdr.forward_counter += 1;
if from_peer_id == my_peer_id
&& (hdr.packet_type == PacketType::Data as u8
if from_peer_id == my_peer_id {
compress_tx_bytes_before.add(buf_len as u64);
if hdr.packet_type == PacketType::Data as u8
|| hdr.packet_type == PacketType::KcpSrc as u8
|| hdr.packet_type == PacketType::KcpDst as u8)
{
let _ = Self::try_compress_and_encrypt(compress_algo, &encryptor, &mut ret)
.await;
|| hdr.packet_type == PacketType::KcpDst as u8
{
let _ =
Self::try_compress_and_encrypt(compress_algo, &encryptor, &mut ret)
.await;
}
compress_tx_bytes_after.add(ret.buf_len() as u64);
self_tx_bytes.add(ret.buf_len() as u64);
self_tx_packets.inc();
} else {
forward_tx_bytes.add(buf_len as u64);
forward_tx_packets.inc();
}
tracing::trace!(?to_peer_id, ?my_peer_id, "need forward");
@@ -629,12 +726,18 @@ impl PeerManager {
continue;
}
self_rx_bytes.add(buf_len as u64);
self_rx_packets.inc();
compress_rx_bytes_before.add(buf_len as u64);
let compressor = DefaultCompressor {};
if let Err(e) = compressor.decompress(&mut ret).await {
tracing::error!(?e, "decompress failed");
continue;
}
compress_rx_bytes_after.add(ret.buf_len() as u64);
if !acl_filter.process_packet_with_acl(
&ret,
true,
@@ -1053,8 +1156,16 @@ impl PeerManager {
return Ok(());
}
self.self_tx_counters
.compress_tx_bytes_before
.add(msg.buf_len() as u64);
Self::try_compress_and_encrypt(self.data_compress_algo, &self.encryptor, &mut msg).await?;
self.self_tx_counters
.compress_tx_bytes_after
.add(msg.buf_len() as u64);
let is_latency_first = self.global_ctx.get_flags().latency_first;
msg.mut_peer_manager_header()
.unwrap()
@@ -1077,6 +1188,11 @@ impl PeerManager {
.to_peer_id
.set(*peer_id);
self.self_tx_counters
.self_tx_bytes
.add(msg.buf_len() as u64);
self.self_tx_counters.self_tx_packets.inc();
if let Err(e) =
Self::send_msg_internal(&self.peers, &self.foreign_network_client, msg, *peer_id)
.await

View File

@@ -4,7 +4,7 @@ use futures::{SinkExt as _, StreamExt};
use tokio::task::JoinSet;
use crate::{
common::{error::Error, PeerId},
common::{error::Error, PeerId, stats_manager::StatsManager},
proto::rpc_impl::{self, bidirect::BidirectRpcManager},
tunnel::packet_def::ZCPacket,
};
@@ -47,6 +47,15 @@ impl PeerRpcManager {
}
}
pub fn new_with_stats_manager(tspt: impl PeerRpcManagerTransport, stats_manager: Arc<StatsManager>) -> Self {
Self {
tspt: Arc::new(Box::new(tspt)),
bidirect_rpc: BidirectRpcManager::new_with_stats_manager(stats_manager),
tasks: Mutex::new(JoinSet::new()),
}
}
pub fn run(&self) {
let ret = self.bidirect_rpc.run_and_create_tunnel();
let (mut rx, mut tx) = ret.split();

View File

@@ -302,3 +302,26 @@ service PortForwardManageRpc {
rpc RemovePortForward(RemovePortForwardRequest) returns (RemovePortForwardResponse);
rpc ListPortForward(ListPortForwardRequest) returns (ListPortForwardResponse);
}
message MetricSnapshot {
string name = 1;
uint64 value = 2;
map<string, string> labels = 3;
}
message GetStatsRequest {}
message GetStatsResponse {
repeated MetricSnapshot metrics = 1;
}
message GetPrometheusStatsRequest {}
message GetPrometheusStatsResponse {
string prometheus_text = 1;
}
service StatsRpc {
rpc GetStats(GetStatsRequest) returns (GetStatsResponse);
rpc GetPrometheusStats(GetPrometheusStatsRequest) returns (GetPrometheusStatsResponse);
}

View File

@@ -9,7 +9,8 @@ use crate::{
tunnel::{packet_def::PacketType, ring::create_ring_tunnel_pair, Tunnel},
};
use super::{client::Client, server::Server};
use super::{client::Client, server::Server, service_registry::ServiceRegistry};
use crate::common::stats_manager::StatsManager;
pub struct BidirectRpcManager {
rpc_client: Client,
@@ -38,6 +39,20 @@ impl BidirectRpcManager {
}
}
pub fn new_with_stats_manager(stats_manager: Arc<StatsManager>) -> Self {
Self {
rpc_client: Client::new_with_stats_manager(stats_manager.clone()),
rpc_server: Server::new_with_registry_and_stats_manager(Arc::new(ServiceRegistry::new()), stats_manager),
rx_timeout: None,
error: Arc::new(Mutex::new(None)),
tunnel: Mutex::new(None),
running: Arc::new(AtomicBool::new(false)),
tasks: Mutex::new(None),
}
}
pub fn set_rx_timeout(mut self, timeout: Option<std::time::Duration>) -> Self {
self.rx_timeout = timeout;
self

View File

@@ -10,7 +10,10 @@ use tokio::task::JoinSet;
use tokio::time::timeout;
use tokio_stream::StreamExt;
use crate::common::PeerId;
use crate::common::{
stats_manager::{LabelSet, LabelType, MetricName, StatsManager},
PeerId,
};
use crate::defer;
use crate::proto::common::{
CompressionAlgoPb, RpcCompressionInfo, RpcDescriptor, RpcPacket, RpcRequest, RpcResponse,
@@ -66,6 +69,7 @@ pub struct Client {
inflight_requests: InflightRequestTable,
peer_info: PeerInfoTable,
tasks: Mutex<JoinSet<()>>,
stats_manager: Option<Arc<StatsManager>>,
}
impl Client {
@@ -77,6 +81,19 @@ impl Client {
inflight_requests: Arc::new(DashMap::new()),
peer_info: Arc::new(DashMap::new()),
tasks: Mutex::new(JoinSet::new()),
stats_manager: None,
}
}
pub fn new_with_stats_manager(stats_manager: Arc<StatsManager>) -> Self {
let (ring_a, ring_b) = create_ring_tunnel_pair();
Self {
mpsc: Mutex::new(MpscTunnel::new(ring_a, None)),
transport: Mutex::new(MpscTunnel::new(ring_b, None)),
inflight_requests: Arc::new(DashMap::new()),
peer_info: Arc::new(DashMap::new()),
tasks: Mutex::new(JoinSet::new()),
stats_manager: Some(stats_manager),
}
}
@@ -168,6 +185,7 @@ impl Client {
zc_packet_sender: MpscTunnelSender,
inflight_requests: InflightRequestTable,
peer_info: PeerInfoTable,
stats_manager: Option<Arc<StatsManager>>,
_phan: PhantomData<F>,
}
@@ -196,6 +214,7 @@ impl Client {
method: <Self::Descriptor as ServiceDescriptor>::Method,
input: bytes::Bytes,
) -> Result<bytes::Bytes> {
let start_time = std::time::Instant::now();
let transaction_id = CUR_TID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let (tx, mut rx) = mpsc::unbounded_channel();
let key = InflightRequestKey {
@@ -203,6 +222,13 @@ impl Client {
to_peer_id: self.to_peer_id,
transaction_id,
};
let desc = self.service_descriptor();
let labels = LabelSet::new()
.with_label_type(LabelType::NetworkName(self.domain_name.to_string()))
.with_label_type(LabelType::SrcPeerId(self.from_peer_id))
.with_label_type(LabelType::DstPeerId(self.to_peer_id))
.with_label_type(LabelType::ServiceName(desc.name().to_string()))
.with_label_type(LabelType::MethodName(method.name().to_string()));
defer!(self.inflight_requests.remove(&key););
self.inflight_requests.insert(
@@ -210,11 +236,16 @@ impl Client {
InflightRequest {
sender: tx,
merger: PacketMerger::new(),
start_time: std::time::Instant::now(),
start_time,
},
);
let desc = self.service_descriptor();
// Record RPC client TX stats
if let Some(ref stats_manager) = self.stats_manager {
stats_manager
.get_counter(MetricName::PeerRpcClientTx, labels.clone())
.inc();
}
let rpc_desc = RpcDescriptor {
domain_name: self.domain_name.clone(),
@@ -281,12 +312,44 @@ impl Client {
let rpc_resp = RpcResponse::decode(Bytes::from(rpc_packet.body))?;
if let Some(err) = &rpc_resp.error {
// Record RPC error stats
if let Some(ref stats_manager) = self.stats_manager {
let labels = labels
.clone()
.with_label_type(LabelType::ErrorType(format!("{:?}", err.error_kind)))
.with_label_type(LabelType::Status("error".to_string()));
stats_manager
.get_counter(MetricName::PeerRpcErrors, labels.clone())
.inc();
let duration_ms = start_time.elapsed().as_millis() as u64;
stats_manager
.get_counter(MetricName::PeerRpcDuration, labels)
.add(duration_ms);
}
return Err(err.into());
}
let raw_output = Bytes::from(rpc_resp.response.clone());
ctrl.set_raw_output(raw_output.clone());
// Record RPC client RX and duration stats
if let Some(ref stats_manager) = self.stats_manager {
let labels = labels
.clone()
.with_label_type(LabelType::Status("success".to_string()));
stats_manager
.get_counter(MetricName::PeerRpcClientRx, labels.clone())
.inc();
let duration_ms = start_time.elapsed().as_millis() as u64;
stats_manager
.get_counter(MetricName::PeerRpcDuration, labels)
.add(duration_ms);
}
Ok(raw_output)
}
}
@@ -298,6 +361,7 @@ impl Client {
zc_packet_sender: self.mpsc.lock().unwrap().get_sink(),
inflight_requests: self.inflight_requests.clone(),
peer_info: self.peer_info.clone(),
stats_manager: self.stats_manager.clone(),
_phan: PhantomData,
})
}

View File

@@ -10,7 +10,11 @@ use tokio::{task::JoinSet, time::timeout};
use tokio_stream::StreamExt;
use crate::{
common::{join_joinset_background, PeerId},
common::{
join_joinset_background,
stats_manager::{LabelSet, LabelType, MetricName, StatsManager},
PeerId,
},
proto::{
common::{
self, CompressionAlgoPb, RpcCompressionInfo, RpcPacket, RpcRequest, RpcResponse,
@@ -46,6 +50,7 @@ pub struct Server {
tasks: Arc<Mutex<JoinSet<()>>>,
packet_mergers: Arc<DashMap<PacketMergerKey, PacketMerger>>,
stats_manager: Option<Arc<StatsManager>>,
}
impl Server {
@@ -62,6 +67,23 @@ impl Server {
transport: Mutex::new(MpscTunnel::new(ring_b, None)),
tasks: Arc::new(Mutex::new(JoinSet::new())),
packet_mergers: Arc::new(DashMap::new()),
stats_manager: None,
}
}
pub fn new_with_registry_and_stats_manager(
registry: Arc<ServiceRegistry>,
stats_manager: Arc<StatsManager>,
) -> Self {
let (ring_a, ring_b) = create_ring_tunnel_pair();
Self {
registry,
mpsc: Mutex::new(Some(MpscTunnel::new(ring_a, None))),
transport: Mutex::new(MpscTunnel::new(ring_b, None)),
tasks: Arc::new(Mutex::new(JoinSet::new())),
packet_mergers: Arc::new(DashMap::new()),
stats_manager: Some(stats_manager),
}
}
@@ -85,6 +107,7 @@ impl Server {
let packet_merges = self.packet_mergers.clone();
let reg = self.registry.clone();
let stats_manager = self.stats_manager.clone();
let t = Arc::downgrade(&tasks);
let tunnel_info = mpsc.tunnel_info();
tasks.lock().unwrap().spawn(async move {
@@ -133,6 +156,7 @@ impl Server {
packet,
reg.clone(),
tunnel_info.clone(),
stats_manager.clone(),
));
}
Ok(None) => {}
@@ -189,12 +213,27 @@ impl Server {
packet: RpcPacket,
reg: Arc<ServiceRegistry>,
tunnel_info: Option<TunnelInfo>,
stats_manager: Option<Arc<StatsManager>>,
) {
let from_peer = packet.from_peer;
let to_peer = packet.to_peer;
let transaction_id = packet.transaction_id;
let trace_id = packet.trace_id;
let desc = packet.descriptor.clone().unwrap();
let method_name = reg.get_method_name(&desc).unwrap_or("<Nil>".to_owned());
let labels = LabelSet::new()
.with_label_type(LabelType::NetworkName(desc.domain_name.to_string()))
.with_label_type(LabelType::SrcPeerId(from_peer))
.with_label_type(LabelType::DstPeerId(to_peer))
.with_label_type(LabelType::ServiceName(desc.service_name.to_string()))
.with_label_type(LabelType::MethodName(method_name));
// Record RPC server RX stats
if let Some(ref stats_manager) = stats_manager {
stats_manager
.get_counter(MetricName::PeerRpcServerRx, labels.clone())
.inc();
}
let mut resp_msg = RpcResponse::default();
let now = std::time::Instant::now();
@@ -205,9 +244,41 @@ impl Server {
match &resp_bytes {
Ok(r) => {
resp_msg.response = r.clone().into();
// Record successful RPC server TX and duration stats
if let Some(ref stats_manager) = stats_manager {
let labels = labels
.clone()
.with_label_type(LabelType::Status("success".to_string()));
stats_manager
.get_counter(MetricName::PeerRpcServerTx, labels.clone())
.inc();
let duration_ms = now.elapsed().as_millis() as u64;
stats_manager
.get_counter(MetricName::PeerRpcDuration, labels)
.add(duration_ms);
}
}
Err(err) => {
resp_msg.error = Some(err.into());
// Record RPC server error stats
if let Some(ref stats_manager) = stats_manager {
let labels = labels
.clone()
.with_label_type(LabelType::Status("error".to_string()));
stats_manager
.get_counter(MetricName::PeerRpcErrors, labels.clone())
.inc();
let duration_ms = now.elapsed().as_millis() as u64;
stats_manager
.get_counter(MetricName::PeerRpcDuration, labels)
.add(duration_ms);
}
}
};
resp_msg.runtime_us = now.elapsed().as_micros() as u64;

View File

@@ -78,6 +78,14 @@ impl ServiceRegistry {
self.table.insert(key, entry);
}
pub fn get_method_name(&self, rpc_desc: &RpcDescriptor) -> Option<String> {
let service_key = ServiceKey::from(rpc_desc);
let entry = self.table.get(&service_key)?;
let method_index = rpc_desc.method_index as u8;
let method_name = entry.service.get_method_name(method_index).ok()?;
Some(method_name)
}
pub fn unregister<H: Handler<Controller = RpcController>>(
&self,
h: H,

View File

@@ -1,4 +1,6 @@
//! Traits for defining generic RPC handlers.
use crate::proto::rpc_types::descriptor::MethodDescriptor;
use super::{
controller::Controller,
descriptor::{self, ServiceDescriptor},
@@ -49,6 +51,8 @@ pub trait HandlerExt: Send + Sync + 'static {
method_index: u8,
input: bytes::Bytes,
) -> super::error::Result<bytes::Bytes>;
fn get_method_name(&self, method_index: u8) -> super::error::Result<String>;
}
#[async_trait::async_trait]
@@ -64,4 +68,10 @@ impl<C: Controller, T: Handler<Controller = C>> HandlerExt for T {
let method = self.get_method_from_index(method_index)?;
self.call(ctrl, method, input).await
}
fn get_method_name(&self, method_index: u8) -> super::error::Result<String> {
let method = self.get_method_from_index(method_index)?;
let name = method.name().to_string();
Ok(name)
}
}