mirror of
https://github.com/EasyTier/EasyTier.git
synced 2025-09-27 04:56:07 +08:00
remove lock on pipelines
This commit is contained in:
@@ -306,7 +306,7 @@ impl IcmpProxy {
|
||||
return Err(anyhow::anyhow!("peer manager is gone").into());
|
||||
};
|
||||
|
||||
pm.add_packet_process_pipeline(Box::new(self.clone())).await;
|
||||
pm.add_packet_process_pipeline(self.clone()).await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@@ -341,13 +341,13 @@ impl KcpProxySrc {
|
||||
|
||||
pub async fn start(&self) {
|
||||
self.peer_manager
|
||||
.add_nic_packet_process_pipeline(Box::new(self.tcp_proxy.clone()))
|
||||
.add_nic_packet_process_pipeline(Arc::new(self.tcp_proxy.clone()))
|
||||
.await;
|
||||
self.peer_manager
|
||||
.add_packet_process_pipeline(Box::new(self.tcp_proxy.0.clone()))
|
||||
.add_packet_process_pipeline(Arc::new(self.tcp_proxy.0.clone()))
|
||||
.await;
|
||||
self.peer_manager
|
||||
.add_packet_process_pipeline(Box::new(KcpEndpointFilter {
|
||||
.add_packet_process_pipeline(Arc::new(KcpEndpointFilter {
|
||||
kcp_endpoint: self.kcp_endpoint.clone(),
|
||||
is_src: true,
|
||||
}))
|
||||
@@ -484,7 +484,7 @@ impl KcpProxyDst {
|
||||
pub async fn start(&mut self) {
|
||||
self.run_accept_task().await;
|
||||
self.peer_manager
|
||||
.add_packet_process_pipeline(Box::new(KcpEndpointFilter {
|
||||
.add_packet_process_pipeline(Arc::new(KcpEndpointFilter {
|
||||
kcp_endpoint: self.kcp_endpoint.clone(),
|
||||
is_src: false,
|
||||
}))
|
||||
|
@@ -227,10 +227,10 @@ impl QUICProxySrc {
|
||||
|
||||
pub async fn start(&self) {
|
||||
self.peer_manager
|
||||
.add_nic_packet_process_pipeline(Box::new(self.tcp_proxy.clone()))
|
||||
.add_nic_packet_process_pipeline(Arc::new(self.tcp_proxy.clone()))
|
||||
.await;
|
||||
self.peer_manager
|
||||
.add_packet_process_pipeline(Box::new(self.tcp_proxy.0.clone()))
|
||||
.add_packet_process_pipeline(Arc::new(self.tcp_proxy.0.clone()))
|
||||
.await;
|
||||
self.tcp_proxy.0.start(false).await.unwrap();
|
||||
}
|
||||
|
@@ -621,7 +621,7 @@ impl Socks5Server {
|
||||
|
||||
if need_start {
|
||||
self.peer_manager
|
||||
.add_packet_process_pipeline(Box::new(self.clone()))
|
||||
.add_packet_process_pipeline(self.clone())
|
||||
.await;
|
||||
|
||||
self.run_net_update_task().await;
|
||||
|
@@ -476,10 +476,10 @@ impl<C: NatDstConnector> TcpProxy<C> {
|
||||
self.run_listener().await?;
|
||||
if add_pipeline {
|
||||
self.peer_manager
|
||||
.add_packet_process_pipeline(Box::new(self.clone()))
|
||||
.add_packet_process_pipeline(self.clone())
|
||||
.await;
|
||||
self.peer_manager
|
||||
.add_nic_packet_process_pipeline(Box::new(self.clone()))
|
||||
.add_nic_packet_process_pipeline(self.clone())
|
||||
.await;
|
||||
}
|
||||
join_joinset_background(self.tasks.clone(), "TcpProxy".to_owned());
|
||||
|
@@ -404,7 +404,7 @@ impl UdpProxy {
|
||||
|
||||
pub async fn start(self: &Arc<Self>) -> Result<(), Error> {
|
||||
self.peer_manager
|
||||
.add_packet_process_pipeline(Box::new(self.clone()))
|
||||
.add_packet_process_pipeline(self.clone())
|
||||
.await;
|
||||
|
||||
// clean up nat table
|
||||
|
@@ -404,9 +404,7 @@ impl MagicDnsServerInstance {
|
||||
.register(MagicDnsServerRpcServer::new(data.clone()), "");
|
||||
rpc_server.set_hook(data.clone());
|
||||
|
||||
peer_mgr
|
||||
.add_nic_packet_process_pipeline(Box::new(data.clone()))
|
||||
.await;
|
||||
peer_mgr.add_nic_packet_process_pipeline(data.clone()).await;
|
||||
|
||||
let data_clone = data.clone();
|
||||
tokio::task::spawn_blocking(move || data_clone.do_system_config(DEFAULT_ET_DNS_ZONE))
|
||||
|
@@ -23,6 +23,8 @@ pub mod peer_task;
|
||||
#[cfg(test)]
|
||||
pub mod tests;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::tunnel::packet_def::ZCPacket;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
@@ -43,8 +45,8 @@ pub trait NicPacketFilter {
|
||||
}
|
||||
}
|
||||
|
||||
type BoxPeerPacketFilter = Box<dyn PeerPacketFilter + Send + Sync>;
|
||||
type BoxNicPacketFilter = Box<dyn NicPacketFilter + Send + Sync>;
|
||||
type BoxPeerPacketFilter = Arc<dyn PeerPacketFilter + Send + Sync>;
|
||||
type BoxNicPacketFilter = Arc<dyn NicPacketFilter + Send + Sync>;
|
||||
|
||||
// pub type PacketRecvChan = tachyonix::Sender<ZCPacket>;
|
||||
// pub type PacketRecvChanReceiver = tachyonix::Receiver<ZCPacket>;
|
||||
|
@@ -6,6 +6,7 @@ use std::{
|
||||
};
|
||||
|
||||
use anyhow::Context;
|
||||
use arc_swap::ArcSwap;
|
||||
use async_trait::async_trait;
|
||||
|
||||
use dashmap::DashMap;
|
||||
@@ -13,7 +14,7 @@ use dashmap::DashMap;
|
||||
use tokio::{
|
||||
sync::{
|
||||
mpsc::{self, UnboundedReceiver, UnboundedSender},
|
||||
Mutex, RwLock,
|
||||
Mutex,
|
||||
},
|
||||
task::JoinSet,
|
||||
};
|
||||
@@ -131,8 +132,8 @@ pub struct PeerManager {
|
||||
peer_rpc_mgr: Arc<PeerRpcManager>,
|
||||
peer_rpc_tspt: Arc<RpcTransport>,
|
||||
|
||||
peer_packet_process_pipeline: Arc<RwLock<Vec<BoxPeerPacketFilter>>>,
|
||||
nic_packet_process_pipeline: Arc<RwLock<Vec<BoxNicPacketFilter>>>,
|
||||
peer_packet_process_pipeline: Arc<ArcSwap<Vec<BoxPeerPacketFilter>>>,
|
||||
nic_packet_process_pipeline: ArcSwap<Vec<BoxNicPacketFilter>>,
|
||||
|
||||
route_algo_inst: RouteAlgoInst,
|
||||
|
||||
@@ -261,8 +262,8 @@ impl PeerManager {
|
||||
peer_rpc_mgr,
|
||||
peer_rpc_tspt: rpc_tspt,
|
||||
|
||||
peer_packet_process_pipeline: Arc::new(RwLock::new(Vec::new())),
|
||||
nic_packet_process_pipeline: Arc::new(RwLock::new(Vec::new())),
|
||||
peer_packet_process_pipeline: Arc::new(ArcSwap::from(Arc::new(Vec::new()))),
|
||||
nic_packet_process_pipeline: ArcSwap::from(Arc::new(Vec::new())),
|
||||
|
||||
route_algo_inst,
|
||||
|
||||
@@ -647,7 +648,7 @@ impl PeerManager {
|
||||
let mut processed = false;
|
||||
let mut zc_packet = Some(ret);
|
||||
let mut idx = 0;
|
||||
for pipeline in pipe_line.read().await.iter().rev() {
|
||||
for pipeline in pipe_line.load().iter().rev() {
|
||||
tracing::trace!(?zc_packet, ?idx, "try_process_packet_from_peer");
|
||||
idx += 1;
|
||||
zc_packet = pipeline
|
||||
@@ -669,18 +670,20 @@ impl PeerManager {
|
||||
|
||||
pub async fn add_packet_process_pipeline(&self, pipeline: BoxPeerPacketFilter) {
|
||||
// newest pipeline will be executed first
|
||||
let current = self.peer_packet_process_pipeline.load();
|
||||
let mut new_pipelines = (*(*current)).iter().map(|x| x.clone()).collect::<Vec<_>>();
|
||||
new_pipelines.push(pipeline);
|
||||
self.peer_packet_process_pipeline
|
||||
.write()
|
||||
.await
|
||||
.push(pipeline);
|
||||
.swap(Arc::new(new_pipelines));
|
||||
}
|
||||
|
||||
pub async fn add_nic_packet_process_pipeline(&self, pipeline: BoxNicPacketFilter) {
|
||||
// newest pipeline will be executed first
|
||||
let current = self.nic_packet_process_pipeline.load();
|
||||
let mut new_pipelines = (*current).iter().map(|x| x.clone()).collect::<Vec<_>>();
|
||||
new_pipelines.push(pipeline);
|
||||
self.nic_packet_process_pipeline
|
||||
.write()
|
||||
.await
|
||||
.push(pipeline);
|
||||
.swap(Arc::new(new_pipelines));
|
||||
}
|
||||
|
||||
async fn init_packet_process_pipeline(&self) {
|
||||
@@ -702,7 +705,7 @@ impl PeerManager {
|
||||
}
|
||||
}
|
||||
}
|
||||
self.add_packet_process_pipeline(Box::new(NicPacketProcessor {
|
||||
self.add_packet_process_pipeline(Arc::new(NicPacketProcessor {
|
||||
nic_channel: self.nic_channel.clone(),
|
||||
}))
|
||||
.await;
|
||||
@@ -727,7 +730,7 @@ impl PeerManager {
|
||||
}
|
||||
}
|
||||
}
|
||||
self.add_packet_process_pipeline(Box::new(PeerRpcPacketProcessor {
|
||||
self.add_packet_process_pipeline(Arc::new(PeerRpcPacketProcessor {
|
||||
peer_rpc_tspt_sender: self.peer_rpc_tspt.peer_rpc_tspt_sender.clone(),
|
||||
}))
|
||||
.await;
|
||||
@@ -735,12 +738,8 @@ impl PeerManager {
|
||||
|
||||
pub async fn add_route<T>(&self, route: T)
|
||||
where
|
||||
T: Route + PeerPacketFilter + Send + Sync + Clone + 'static,
|
||||
T: Route + Send + Sync + Clone + 'static,
|
||||
{
|
||||
// for route
|
||||
self.add_packet_process_pipeline(Box::new(route.clone()))
|
||||
.await;
|
||||
|
||||
struct Interface {
|
||||
my_peer_id: PeerId,
|
||||
peers: Weak<PeerMap>,
|
||||
@@ -866,15 +865,19 @@ impl PeerManager {
|
||||
return;
|
||||
}
|
||||
|
||||
for pipeline in self.nic_packet_process_pipeline.read().await.iter().rev() {
|
||||
let pipelines = self.nic_packet_process_pipeline.load();
|
||||
for pipeline in pipelines.iter().rev() {
|
||||
let _ = pipeline.try_process_packet_from_nic(data).await;
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn remove_nic_packet_process_pipeline(&self, id: String) -> Result<(), Error> {
|
||||
let mut pipelines = self.nic_packet_process_pipeline.write().await;
|
||||
if let Some(pos) = pipelines.iter().position(|x| x.id() == id) {
|
||||
pipelines.remove(pos);
|
||||
let current = self.nic_packet_process_pipeline.load();
|
||||
let mut new_pipelines = (*current).iter().map(|x| x.clone()).collect::<Vec<_>>();
|
||||
if let Some(pos) = new_pipelines.iter().position(|x| x.id() == id) {
|
||||
new_pipelines.remove(pos);
|
||||
self.nic_packet_process_pipeline
|
||||
.swap(Arc::new(new_pipelines));
|
||||
Ok(())
|
||||
} else {
|
||||
Err(Error::NotFound)
|
||||
@@ -1206,10 +1209,9 @@ impl PeerManager {
|
||||
}
|
||||
|
||||
pub async fn clear_resources(&self) {
|
||||
let mut peer_pipeline = self.peer_packet_process_pipeline.write().await;
|
||||
peer_pipeline.clear();
|
||||
let mut nic_pipeline = self.nic_packet_process_pipeline.write().await;
|
||||
nic_pipeline.clear();
|
||||
self.peer_packet_process_pipeline
|
||||
.store(Arc::new(Vec::new()));
|
||||
self.nic_packet_process_pipeline.store(Arc::new(Vec::new()));
|
||||
|
||||
self.peer_rpc_mgr.rpc_server().registry().unregister_all();
|
||||
}
|
||||
|
@@ -55,7 +55,6 @@ use super::{
|
||||
DefaultRouteCostCalculator, ForeignNetworkRouteInfoMap, NextHopPolicy, RouteCostCalculator,
|
||||
RouteCostCalculatorInterface,
|
||||
},
|
||||
PeerPacketFilter,
|
||||
};
|
||||
|
||||
static SERVICE_ID: u32 = 7;
|
||||
@@ -2369,8 +2368,6 @@ impl Route for PeerRoute {
|
||||
}
|
||||
}
|
||||
|
||||
impl PeerPacketFilter for Arc<PeerRoute> {}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::{
|
||||
|
@@ -203,7 +203,7 @@ impl WireGuardImpl {
|
||||
}
|
||||
|
||||
self.peer_mgr
|
||||
.add_packet_process_pipeline(Box::new(PeerPacketFilterForVpnPortal {
|
||||
.add_packet_process_pipeline(Arc::new(PeerPacketFilterForVpnPortal {
|
||||
wg_peer_ip_table: self.wg_peer_ip_table.clone(),
|
||||
}))
|
||||
.await;
|
||||
|
Reference in New Issue
Block a user