Update On Sun May 4 20:34:17 CEST 2025

This commit is contained in:
github-action[bot]
2025-05-04 20:34:17 +02:00
parent 14378a30d0
commit 40ece1c105
167 changed files with 23856 additions and 10037 deletions

View File

@@ -10,7 +10,6 @@ use crate::{
lightweight::{entry_lightweight_mode, is_in_lightweight_mode},
mihomo::Rate,
},
process::AsyncHandler,
resolve,
utils::{dirs::find_target_icons, i18n::t, logging::Type, resolve::VERSION},
};
@@ -390,56 +389,136 @@ impl Tray {
// 如果已经订阅,先取消订阅
if *self.is_subscribed.read() {
self.unsubscribe_traffic();
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
let (shutdown_tx, shutdown_rx) = broadcast::channel(1);
let (shutdown_tx, shutdown_rx) = broadcast::channel(3);
*self.shutdown_tx.write() = Some(shutdown_tx);
*self.is_subscribed.write() = true;
let speed_rate = Arc::clone(&self.speed_rate);
let is_subscribed = Arc::clone(&self.is_subscribed);
AsyncHandler::spawn(move || {
let mut shutdown = shutdown_rx;
let speed_rate = speed_rate.clone(); // 确保 Arc 被正确克隆
let is_subscribed = is_subscribed.clone();
// 使用单线程防止阻塞主线程
std::thread::Builder::new()
.name("traffic-monitor".into())
.spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("Failed to build tokio runtime for traffic monitor");
// 在单独的运行时中执行异步任务
rt.block_on(async move {
let mut shutdown = shutdown_rx;
let speed_rate = speed_rate.clone();
let is_subscribed = is_subscribed.clone();
let mut consecutive_errors = 0;
let max_consecutive_errors = 5;
Box::pin(async move {
'outer: loop {
match Traffic::get_traffic_stream().await {
Ok(mut stream) => loop {
tokio::select! {
Some(traffic) = stream.next() => {
if let Ok(traffic) = traffic {
let guard = speed_rate.lock();
let enable_tray_speed: bool = Config::verge()
.latest()
.enable_tray_speed
.unwrap_or(true);
if !enable_tray_speed {
continue;
}
if let Some(sr) = guard.as_ref() {
if let Some(rate) = sr.update_and_check_changed(traffic.up, traffic.down) {
let _ = Tray::global().update_icon(Some(rate));
let mut interval = tokio::time::interval(std::time::Duration::from_secs(10));
'outer: loop {
if !*is_subscribed.read() {
log::info!(target: "app", "Traffic subscription has been cancelled");
break;
}
match tokio::time::timeout(
std::time::Duration::from_secs(5),
Traffic::get_traffic_stream()
).await {
Ok(stream_result) => {
match stream_result {
Ok(mut stream) => {
consecutive_errors = 0;
loop {
tokio::select! {
traffic_result = stream.next() => {
match traffic_result {
Some(Ok(traffic)) => {
if let Ok(speedrate_result) = tokio::time::timeout(
std::time::Duration::from_millis(50),
async {
let guard = speed_rate.try_lock();
if let Some(guard) = guard {
if let Some(sr) = guard.as_ref() {
sr.update_and_check_changed(traffic.up, traffic.down)
} else {
None
}
} else {
None
}
}
).await {
if let Some(rate) = speedrate_result {
let _ = tokio::time::timeout(
std::time::Duration::from_millis(100),
async { let _ = Tray::global().update_icon(Some(rate)); }
).await;
}
}
},
Some(Err(e)) => {
log::error!(target: "app", "Traffic stream error: {}", e);
consecutive_errors += 1;
if consecutive_errors >= max_consecutive_errors {
log::error!(target: "app", "Too many errors, reconnecting traffic stream");
break;
}
},
None => {
log::info!(target: "app", "Traffic stream ended, reconnecting");
break;
}
}
},
_ = shutdown.recv() => {
log::info!(target: "app", "Received shutdown signal for traffic stream");
break 'outer;
},
_ = interval.tick() => {
if !*is_subscribed.read() {
log::info!(target: "app", "Traffic monitor detected subscription cancelled");
break 'outer;
}
log::debug!(target: "app", "Traffic subscription periodic health check");
},
_ = tokio::time::sleep(std::time::Duration::from_secs(60)) => {
log::info!(target: "app", "Traffic stream max active time reached, reconnecting");
break;
}
}
}
},
Err(e) => {
log::error!(target: "app", "Failed to get traffic stream: {}", e);
consecutive_errors += 1;
if consecutive_errors >= max_consecutive_errors {
log::error!(target: "app", "Too many consecutive errors, pausing traffic monitoring");
tokio::time::sleep(std::time::Duration::from_secs(30)).await;
consecutive_errors = 0;
} else {
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
}
}
}
_ = shutdown.recv() => break 'outer,
}
},
Err(e) => {
log::error!(target: "app", "Failed to get traffic stream: {}", e);
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
if !*is_subscribed.read() {
break;
},
Err(_) => {
log::error!(target: "app", "Traffic stream initialization timed out");
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
}
}
if !*is_subscribed.read() {
break;
}
}
}
log::info!(target: "app", "Traffic subscription thread terminated");
});
})
});
.expect("Failed to spawn traffic monitor thread");
Ok(())
}