mirror of
https://github.com/EasyTier/EasyTier.git
synced 2025-09-26 20:51:17 +08:00
fix mem leak of token bucket (#1055)
This commit is contained in:
@@ -67,12 +67,16 @@ impl TokenBucket {
|
||||
});
|
||||
|
||||
// Start background refill task
|
||||
let arc_clone = arc_self.clone();
|
||||
let weak_bucket = Arc::downgrade(&arc_self);
|
||||
let refill_interval = arc_self.config.refill_interval;
|
||||
let refill_task = tokio::spawn(async move {
|
||||
let mut interval = time::interval(arc_clone.config.refill_interval);
|
||||
let mut interval = time::interval(refill_interval);
|
||||
loop {
|
||||
interval.tick().await;
|
||||
arc_clone.refill();
|
||||
let Some(bucket) = weak_bucket.upgrade() else {
|
||||
break;
|
||||
};
|
||||
bucket.refill();
|
||||
}
|
||||
});
|
||||
|
||||
@@ -167,9 +171,9 @@ impl TokenBucketManager {
|
||||
let retain_task = tokio::spawn(async move {
|
||||
loop {
|
||||
// Retain only buckets that are still in use
|
||||
buckets_clone.retain(|_, bucket| Arc::<TokenBucket>::strong_count(bucket) <= 1);
|
||||
buckets_clone.retain(|_, bucket| Arc::<TokenBucket>::strong_count(bucket) > 1);
|
||||
// Sleep for a while before next retention check
|
||||
tokio::time::sleep(Duration::from_secs(60)).await;
|
||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||
}
|
||||
});
|
||||
|
||||
@@ -190,6 +194,16 @@ impl TokenBucketManager {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::{
|
||||
connector::udp_hole_punch::tests::create_mock_peer_manager_with_mock_stun,
|
||||
peers::{
|
||||
foreign_network_manager::tests::create_mock_peer_manager_for_foreign_network,
|
||||
tests::connect_peer_manager,
|
||||
},
|
||||
proto::common::NatType,
|
||||
tunnel::common::tests::wait_for_condition,
|
||||
};
|
||||
|
||||
use super::*;
|
||||
use tokio::time::{sleep, Duration};
|
||||
|
||||
@@ -309,4 +323,58 @@ mod tests {
|
||||
tokens
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_token_bucket_free() {
|
||||
let pm_center1 = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;
|
||||
|
||||
for i in 0..10 {
|
||||
let pma_net1 = create_mock_peer_manager_for_foreign_network(&format!("net{}", i)).await;
|
||||
|
||||
connect_peer_manager(pma_net1.clone(), pm_center1.clone()).await;
|
||||
wait_for_condition(
|
||||
|| async { pma_net1.list_routes().await.len() == 1 },
|
||||
Duration::from_secs(5),
|
||||
)
|
||||
.await;
|
||||
println!("net{}", i);
|
||||
println!(
|
||||
"buckets: {}",
|
||||
pm_center1
|
||||
.get_global_ctx()
|
||||
.token_bucket_manager()
|
||||
.buckets
|
||||
.len()
|
||||
);
|
||||
|
||||
drop(pma_net1);
|
||||
wait_for_condition(
|
||||
|| async {
|
||||
pm_center1
|
||||
.get_foreign_network_manager()
|
||||
.list_foreign_networks()
|
||||
.await
|
||||
.foreign_networks
|
||||
.len()
|
||||
== 0
|
||||
},
|
||||
Duration::from_secs(5),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
// wait token bucket empty
|
||||
wait_for_condition(
|
||||
|| async {
|
||||
pm_center1
|
||||
.get_global_ctx()
|
||||
.token_bucket_manager()
|
||||
.buckets
|
||||
.len()
|
||||
== 0
|
||||
},
|
||||
Duration::from_secs(10),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
@@ -675,7 +675,7 @@ impl Drop for ForeignNetworkManager {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
pub mod tests {
|
||||
use crate::{
|
||||
common::global_ctx::tests::get_mock_global_ctx_with_network,
|
||||
connector::udp_hole_punch::tests::{
|
||||
@@ -711,7 +711,7 @@ mod tests {
|
||||
peer_mgr
|
||||
}
|
||||
|
||||
async fn create_mock_peer_manager_for_foreign_network(network: &str) -> Arc<PeerManager> {
|
||||
pub async fn create_mock_peer_manager_for_foreign_network(network: &str) -> Arc<PeerManager> {
|
||||
create_mock_peer_manager_for_foreign_network_ext(network, network).await
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user