[fix] fix ipc suffix, use port instead

This commit is contained in:
liyonghua0910
2025-09-12 17:44:05 +08:00
parent 013338f2f5
commit c7b8f4f8c6
6 changed files with 28 additions and 44 deletions

View File

@@ -136,7 +136,7 @@ class PrefixCacheManager:
name="cache_task_broadcast_signal",
array=broadcast_cache_task_flag_array,
dtype=np.int32,
suffix=pid_suffix,
suffix=engine_worker_queue_port,
create=True,
)
@@ -169,7 +169,7 @@ class PrefixCacheManager:
name="cache_ready_signal",
array=cache_ready_signal_data,
dtype=np.int32,
suffix=pid_suffix,
suffix=engine_worker_queue_port,
create=False,
)
swap_space_ready_data = np.zeros(shape=[tensor_parallel_size], dtype=np.int32)
@@ -177,7 +177,15 @@ class PrefixCacheManager:
name="swap_space_ready_signal",
array=swap_space_ready_data,
dtype=np.int32,
suffix=pid_suffix,
suffix=engine_worker_queue_port,
create=False,
)
prefix_tree_status = np.zeros([1], dtype=np.int32)
self.prefix_tree_status_signal = IPCSignal(
name="prefix_tree_status",
array=prefix_tree_status,
dtype=np.int32,
suffix=engine_worker_queue_port,
create=False,
)
@@ -237,7 +245,7 @@ class PrefixCacheManager:
logger.info("Enable hierarchical cache.")
threading.Thread(target=self.recv_data_transfer_result).start()
if cache_config.enable_prefix_caching:
threading.Thread(target=self.clear_prefix_cache, args=(pid_suffix,), daemon=True).start()
threading.Thread(target=self.clear_prefix_cache, daemon=True).start()
return cache_manager_processes
@@ -1358,19 +1366,12 @@ class PrefixCacheManager:
main_process_metrics.free_gpu_block_num.set(len(self.gpu_free_block_list))
main_process_metrics.available_gpu_resource.set(self.available_gpu_resource)
def clear_prefix_cache(self, pid_suffix):
def clear_prefix_cache(self):
"""
If the model weights status is updating or clearing, reset prefix cache tree
"""
logger.info("Start a thread to clear prefix cache when model weights are cleared.")
prefix_tree_status = np.zeros([1], dtype=np.int32)
prefix_tree_status_signal = IPCSignal(
name="prefix_tree_status",
array=prefix_tree_status,
dtype=np.int32,
suffix=pid_suffix,
create=False,
)
prefix_tree_status_signal = self.prefix_tree_status_signal
while True:
if prefix_tree_status_signal.value[0] == PrefixTreeStatus.CLEARING:
self.reset()

View File

@@ -178,7 +178,7 @@ class EngineService:
create=True,
)
cache_ready_signal_data = np.zeros(shape=[self.cfg.tensor_parallel_size], dtype=np.int32)
cache_ready_signal_data = np.zeros(shape=[self.cfg.parallel_config.tensor_parallel_size], dtype=np.int32)
self.cache_ready_signal = IPCSignal(
name="cache_ready_signal",
array=cache_ready_signal_data,
@@ -187,7 +187,7 @@ class EngineService:
create=True,
)
swap_space_ready_signal_data = np.zeros(shape=[self.cfg.tensor_parallel_size], dtype=np.int32)
swap_space_ready_signal_data = np.zeros(shape=[self.cfg.parallel_config.tensor_parallel_size], dtype=np.int32)
self.swap_space_ready_signal = IPCSignal(
name="swap_space_ready_signal",
array=swap_space_ready_signal_data,

View File

@@ -631,12 +631,6 @@ class LLMEngine:
num_gpu_blocks = self.get_profile_block_num_signal.value[0]
self.cfg.cache_config.reset(num_gpu_blocks)
self.engine.resource_manager.reset_cache_config(self.cfg.cache_config)
max_running_requests = num_gpu_blocks * self.cfg.cache_config.block_size // self.cfg.max_model_len
console_logger.info(
f"Detected {num_gpu_blocks} available gpu blocks in cache. "
f"FastDeploy will be serving {max_running_requests} running requests "
f"if each sequence reaches its maximum length: {self.cfg.max_model_len}"
)
if self.cfg.cache_config.enable_prefix_caching or self.cfg.splitwise_role != "mixed":
device_ids = self.cfg.device_ids.split(",")
self.cache_manager_processes = self.engine.start_cache_service(device_ids, self.ipc_signal_suffix, self.cfg.splitwise_role != "mixed")

View File

@@ -117,16 +117,12 @@ class EngineClient:
suffix=port,
create=False,
)
self.connection_manager = DealerConnectionManager(
pid, max_connections=int(os.getenv("FD_DEALER_CONNECTIONS", 50))
)
self.connection_initialized = False
prefix_tree_status = np.zeros([1], dtype=np.int32)
self.prefix_tree_status_signal = IPCSignal(
name="prefix_tree_status",
array=prefix_tree_status,
dtype=np.int32,
suffix=pid,
suffix=port,
create=False,
)
kv_cache_status = np.zeros([1], dtype=np.int32)
@@ -134,9 +130,13 @@ class EngineClient:
name="kv_cache_status",
array=kv_cache_status,
dtype=np.int32,
suffix=pid,
suffix=port,
create=False,
)
self.connection_manager = DealerConnectionManager(
pid, max_connections=int(os.getenv("FD_DEALER_CONNECTIONS", 50))
)
self.connection_initialized = False
self.clear_update_lock = threading.Lock()
def create_zmq_client(self, model, mode):

View File

@@ -1151,7 +1151,7 @@ class GPUModelRunner(ModelRunnerBase):
name="cache_ready_signal",
array=cache_ready_signal_data,
dtype=np.int32,
suffix=self.parallel_config.engine_pid,
suffix=self.parallel_config.engine_worker_queue_port,
create=False,
)

View File

@@ -175,7 +175,7 @@ class PaddleDisWorkerProc:
name="launched_expert_service_signal",
array=launched_expert_service_signal_data,
dtype=np.int32,
suffix=self.parallel_config.engine_pid,
suffix=self.parallel_config.engine_worker_queue_port,
create=False,
)
while self.launched_expert_service_signal.value[self.local_rank % self.max_chips_per_node] == 0:
@@ -192,7 +192,7 @@ class PaddleDisWorkerProc:
name="worker_ready_signal",
array=workers_ready,
dtype=np.int32,
suffix=self.parallel_config.engine_pid,
suffix=self.parallel_config.engine_worker_queue_port,
create=False,
)
self.worker_ready_signal.value[self.local_rank % self.max_chips_per_node] = 1
@@ -391,25 +391,14 @@ class PaddleDisWorkerProc:
name="get_profile_block_num",
array=get_profile_block_num,
dtype=np.int32,
suffix=self.parallel_config.engine_pid,
suffix=self.parallel_config.engine_worker_queue_port,
create=False,
)
self.get_profile_block_num_signal.value[0] = num_blocks_local
else:
num_blocks_local = self.fd_config.parallel_config.total_block_num
logger.info(f"------- num_blocks_global: {num_blocks_local} --------")
# wait engine launch cache_manager
# if self.parallel_config.splitwise_role != "mixed":
# launched_cache_manager_signal_data = np.zeros([1], dtype=np.int32)
# self.launched_cache_manager_signal = IPCSignal(
# name="launched_cache_manager_signal",
# array=launched_cache_manager_signal_data,
# dtype=np.int32,
# suffix=self.parallel_config.engine_pid,
# create=False,
# )
# while np.any(self.launched_cache_manager_signal.value[0] <= 0):
# time.sleep(0.01)
# 4. init kv_cache with accurate num_blocks
self.worker.initialize_cache(num_gpu_blocks=num_blocks_local)
@@ -444,7 +433,7 @@ class PaddleDisWorkerProc:
name="loaded_model_signal",
array=loaded_model_signal_data,
dtype=np.int32,
suffix=self.parallel_config.engine_pid,
suffix=self.parallel_config.engine_worker_queue_port,
create=False,
)
if self.ranks > 1: