diff --git a/fastdeploy/cache_manager/prefix_cache_manager.py b/fastdeploy/cache_manager/prefix_cache_manager.py index 8a3e4a0e4..03dffe906 100644 --- a/fastdeploy/cache_manager/prefix_cache_manager.py +++ b/fastdeploy/cache_manager/prefix_cache_manager.py @@ -136,7 +136,7 @@ class PrefixCacheManager: name="cache_task_broadcast_signal", array=broadcast_cache_task_flag_array, dtype=np.int32, - suffix=pid_suffix, + suffix=engine_worker_queue_port, create=True, ) @@ -169,7 +169,7 @@ class PrefixCacheManager: name="cache_ready_signal", array=cache_ready_signal_data, dtype=np.int32, - suffix=pid_suffix, + suffix=engine_worker_queue_port, create=False, ) swap_space_ready_data = np.zeros(shape=[tensor_parallel_size], dtype=np.int32) @@ -177,7 +177,15 @@ class PrefixCacheManager: name="swap_space_ready_signal", array=swap_space_ready_data, dtype=np.int32, - suffix=pid_suffix, + suffix=engine_worker_queue_port, + create=False, + ) + prefix_tree_status = np.zeros([1], dtype=np.int32) + self.prefix_tree_status_signal = IPCSignal( + name="prefix_tree_status", + array=prefix_tree_status, + dtype=np.int32, + suffix=engine_worker_queue_port, create=False, ) @@ -237,7 +245,7 @@ class PrefixCacheManager: logger.info("Enable hierarchical cache.") threading.Thread(target=self.recv_data_transfer_result).start() if cache_config.enable_prefix_caching: - threading.Thread(target=self.clear_prefix_cache, args=(pid_suffix,), daemon=True).start() + threading.Thread(target=self.clear_prefix_cache, daemon=True).start() return cache_manager_processes @@ -1358,19 +1366,12 @@ class PrefixCacheManager: main_process_metrics.free_gpu_block_num.set(len(self.gpu_free_block_list)) main_process_metrics.available_gpu_resource.set(self.available_gpu_resource) - def clear_prefix_cache(self, pid_suffix): + def clear_prefix_cache(self): """ If the model weights status is updating or clearing, reset prefix cache tree """ logger.info("Start a thread to clear prefix cache when model weights are cleared.") - prefix_tree_status = np.zeros([1], dtype=np.int32) - prefix_tree_status_signal = IPCSignal( - name="prefix_tree_status", - array=prefix_tree_status, - dtype=np.int32, - suffix=pid_suffix, - create=False, - ) + prefix_tree_status_signal = self.prefix_tree_status_signal while True: if prefix_tree_status_signal.value[0] == PrefixTreeStatus.CLEARING: self.reset() diff --git a/fastdeploy/engine/common_engine.py b/fastdeploy/engine/common_engine.py index 3a5215be8..38e671435 100644 --- a/fastdeploy/engine/common_engine.py +++ b/fastdeploy/engine/common_engine.py @@ -178,7 +178,7 @@ class EngineService: create=True, ) - cache_ready_signal_data = np.zeros(shape=[self.cfg.tensor_parallel_size], dtype=np.int32) + cache_ready_signal_data = np.zeros(shape=[self.cfg.parallel_config.tensor_parallel_size], dtype=np.int32) self.cache_ready_signal = IPCSignal( name="cache_ready_signal", array=cache_ready_signal_data, @@ -187,7 +187,7 @@ class EngineService: create=True, ) - swap_space_ready_signal_data = np.zeros(shape=[self.cfg.tensor_parallel_size], dtype=np.int32) + swap_space_ready_signal_data = np.zeros(shape=[self.cfg.parallel_config.tensor_parallel_size], dtype=np.int32) self.swap_space_ready_signal = IPCSignal( name="swap_space_ready_signal", array=swap_space_ready_signal_data, diff --git a/fastdeploy/engine/engine.py b/fastdeploy/engine/engine.py index 9d8364718..3905b3fe6 100644 --- a/fastdeploy/engine/engine.py +++ b/fastdeploy/engine/engine.py @@ -631,12 +631,6 @@ class LLMEngine: num_gpu_blocks = self.get_profile_block_num_signal.value[0] self.cfg.cache_config.reset(num_gpu_blocks) self.engine.resource_manager.reset_cache_config(self.cfg.cache_config) - max_running_requests = num_gpu_blocks * self.cfg.cache_config.block_size // self.cfg.max_model_len - console_logger.info( - f"Detected {num_gpu_blocks} available gpu blocks in cache. " - f"FastDeploy will be serving {max_running_requests} running requests " - f"if each sequence reaches its maximum length: {self.cfg.max_model_len}" - ) if self.cfg.cache_config.enable_prefix_caching or self.cfg.splitwise_role != "mixed": device_ids = self.cfg.device_ids.split(",") self.cache_manager_processes = self.engine.start_cache_service(device_ids, self.ipc_signal_suffix, self.cfg.splitwise_role != "mixed") diff --git a/fastdeploy/entrypoints/engine_client.py b/fastdeploy/entrypoints/engine_client.py index 3fd9b41c0..ff3ae3eb4 100644 --- a/fastdeploy/entrypoints/engine_client.py +++ b/fastdeploy/entrypoints/engine_client.py @@ -117,16 +117,12 @@ class EngineClient: suffix=port, create=False, ) - self.connection_manager = DealerConnectionManager( - pid, max_connections=int(os.getenv("FD_DEALER_CONNECTIONS", 50)) - ) - self.connection_initialized = False prefix_tree_status = np.zeros([1], dtype=np.int32) self.prefix_tree_status_signal = IPCSignal( name="prefix_tree_status", array=prefix_tree_status, dtype=np.int32, - suffix=pid, + suffix=port, create=False, ) kv_cache_status = np.zeros([1], dtype=np.int32) @@ -134,9 +130,13 @@ class EngineClient: name="kv_cache_status", array=kv_cache_status, dtype=np.int32, - suffix=pid, + suffix=port, create=False, ) + self.connection_manager = DealerConnectionManager( + pid, max_connections=int(os.getenv("FD_DEALER_CONNECTIONS", 50)) + ) + self.connection_initialized = False self.clear_update_lock = threading.Lock() def create_zmq_client(self, model, mode): diff --git a/fastdeploy/worker/gpu_model_runner.py b/fastdeploy/worker/gpu_model_runner.py index e25515549..e77d9a18f 100644 --- a/fastdeploy/worker/gpu_model_runner.py +++ b/fastdeploy/worker/gpu_model_runner.py @@ -1151,7 +1151,7 @@ class GPUModelRunner(ModelRunnerBase): name="cache_ready_signal", array=cache_ready_signal_data, dtype=np.int32, - suffix=self.parallel_config.engine_pid, + suffix=self.parallel_config.engine_worker_queue_port, create=False, ) diff --git a/fastdeploy/worker/worker_process.py b/fastdeploy/worker/worker_process.py index 215903f4a..452168783 100644 --- a/fastdeploy/worker/worker_process.py +++ b/fastdeploy/worker/worker_process.py @@ -175,7 +175,7 @@ class PaddleDisWorkerProc: name="launched_expert_service_signal", array=launched_expert_service_signal_data, dtype=np.int32, - suffix=self.parallel_config.engine_pid, + suffix=self.parallel_config.engine_worker_queue_port, create=False, ) while self.launched_expert_service_signal.value[self.local_rank % self.max_chips_per_node] == 0: @@ -192,7 +192,7 @@ class PaddleDisWorkerProc: name="worker_ready_signal", array=workers_ready, dtype=np.int32, - suffix=self.parallel_config.engine_pid, + suffix=self.parallel_config.engine_worker_queue_port, create=False, ) self.worker_ready_signal.value[self.local_rank % self.max_chips_per_node] = 1 @@ -391,25 +391,14 @@ class PaddleDisWorkerProc: name="get_profile_block_num", array=get_profile_block_num, dtype=np.int32, - suffix=self.parallel_config.engine_pid, + suffix=self.parallel_config.engine_worker_queue_port, create=False, ) self.get_profile_block_num_signal.value[0] = num_blocks_local else: num_blocks_local = self.fd_config.parallel_config.total_block_num logger.info(f"------- num_blocks_global: {num_blocks_local} --------") - # wait engine launch cache_manager - # if self.parallel_config.splitwise_role != "mixed": - # launched_cache_manager_signal_data = np.zeros([1], dtype=np.int32) - # self.launched_cache_manager_signal = IPCSignal( - # name="launched_cache_manager_signal", - # array=launched_cache_manager_signal_data, - # dtype=np.int32, - # suffix=self.parallel_config.engine_pid, - # create=False, - # ) - # while np.any(self.launched_cache_manager_signal.value[0] <= 0): - # time.sleep(0.01) + # 4. init kv_cache with accurate num_blocks self.worker.initialize_cache(num_gpu_blocks=num_blocks_local) @@ -444,7 +433,7 @@ class PaddleDisWorkerProc: name="loaded_model_signal", array=loaded_model_signal_data, dtype=np.int32, - suffix=self.parallel_config.engine_pid, + suffix=self.parallel_config.engine_worker_queue_port, create=False, ) if self.ranks > 1: