From 0d7f29841d62c5ae01aec74bbb7e8488dae82c50 Mon Sep 17 00:00:00 2001 From: chenjian <1435317881@qq.com> Date: Thu, 6 Nov 2025 14:01:42 +0800 Subject: [PATCH] [Cherry-pick] Fix bug in develop for eb5 (#4845) * fix bug for PD EP * fix * optimize perf for engine worker queue * fix bug * fix internode ll two stage * fix for ci --- fastdeploy/cache_manager/cache_messager.py | 10 ++++- fastdeploy/engine/async_llm.py | 11 ++++-- fastdeploy/engine/common_engine.py | 28 +++++++------- fastdeploy/engine/engine.py | 16 +++++--- fastdeploy/engine/request.py | 1 + fastdeploy/envs.py | 2 + .../inter_communicator/engine_worker_queue.py | 38 +++++++++++++------ .../layers/moe/fused_moe_backend_base.py | 18 +++++++-- fastdeploy/splitwise/splitwise_connector.py | 7 +++- fastdeploy/worker/gpu_model_runner.py | 4 ++ fastdeploy/worker/worker_process.py | 11 ++++-- 11 files changed, 101 insertions(+), 45 deletions(-) diff --git a/fastdeploy/cache_manager/cache_messager.py b/fastdeploy/cache_manager/cache_messager.py index e6e6aa152..78bed925c 100644 --- a/fastdeploy/cache_manager/cache_messager.py +++ b/fastdeploy/cache_manager/cache_messager.py @@ -132,7 +132,10 @@ class CacheMessager: self.gpu_cache_kvs = gpu_cache_kvs self.rank = rank self.nranks = nranks - address = (pod_ip, engine_worker_queue_port) + if not envs.FD_ENGINE_TASK_QUEUE_WITH_SHM: + address = (pod_ip, engine_worker_queue_port) + else: + address = f"/dev/shm/fd_task_queue_{engine_worker_queue_port}.sock" self.engine_worker_queue = EngineWorkerQueue( address=address, is_server=False, @@ -420,7 +423,10 @@ class CacheMessagerV1: self.gpu_cache_kvs = gpu_cache_kvs self.rank = rank self.nranks = nranks - address = (pod_ip, engine_worker_queue_port) + if not envs.FD_ENGINE_TASK_QUEUE_WITH_SHM: + address = (pod_ip, engine_worker_queue_port) + else: + address = f"/dev/shm/fd_task_queue_{engine_worker_queue_port}.sock" self.engine_worker_queue = EngineWorkerQueue( address=address, is_server=False, diff --git a/fastdeploy/engine/async_llm.py b/fastdeploy/engine/async_llm.py index 11f809b99..936f83477 100644 --- a/fastdeploy/engine/async_llm.py +++ b/fastdeploy/engine/async_llm.py @@ -923,10 +923,13 @@ class AsyncLLMEngine: 1, self.cfg.parallel_config.data_parallel_size // self.cfg.nnode, ): - address = ( - self.cfg.master_ip, - int(self.cfg.parallel_config.engine_worker_queue_port[i]), - ) + if not envs.FD_ENGINE_TASK_QUEUE_WITH_SHM: + address = ( + self.cfg.master_ip, + int(self.cfg.parallel_config.engine_worker_queue_port[i]), + ) + else: + address = f"/dev/shm/fd_task_queue_{self.cfg.parallel_config.engine_worker_queue_port[i]}.sock" llm_logger.info(f"dp start queue service {address}") self.dp_engine_worker_queue_server.append( EngineWorkerQueue( diff --git a/fastdeploy/engine/common_engine.py b/fastdeploy/engine/common_engine.py index bc5f86853..72efede20 100644 --- a/fastdeploy/engine/common_engine.py +++ b/fastdeploy/engine/common_engine.py @@ -263,10 +263,16 @@ class EngineService: """ start queue service for engine worker communication """ - address = ( - self.cfg.master_ip, - int(self.cfg.parallel_config.engine_worker_queue_port[self.cfg.parallel_config.local_data_parallel_id]), - ) + + if not envs.FD_ENGINE_TASK_QUEUE_WITH_SHM: + address = ( + self.cfg.master_ip, + int( + self.cfg.parallel_config.engine_worker_queue_port[self.cfg.parallel_config.local_data_parallel_id] + ), + ) + else: + address = f"/dev/shm/fd_task_queue_{self.cfg.parallel_config.engine_worker_queue_port[self.cfg.parallel_config.local_data_parallel_id]}.sock" if start_queue and (self.cfg.host_ip == self.cfg.master_ip or self.cfg.master_ip == "0.0.0.0"): self.llm_logger.info(f"Starting engine worker queue server service at {address}") @@ -277,16 +283,10 @@ class EngineService: local_data_parallel_size=self.cfg.parallel_config.data_parallel_size, ) # Dynamically updates the port value if an anonymous port is used - self.cfg.parallel_config.engine_worker_queue_port[self.cfg.parallel_config.local_data_parallel_id] = str( - self.engine_worker_queue_server.get_server_port() - ) - address = ( - self.cfg.master_ip, - int( - self.cfg.parallel_config.engine_worker_queue_port[self.cfg.parallel_config.local_data_parallel_id] - ), - ) - + if not envs.FD_ENGINE_TASK_QUEUE_WITH_SHM: + self.cfg.parallel_config.engine_worker_queue_port[self.cfg.parallel_config.local_data_parallel_id] = ( + str(self.engine_worker_queue_server.get_server_port()) + ) if self.cfg.cache_config.enable_prefix_caching or self.cfg.scheduler_config.splitwise_role != "mixed": self.cache_task_queue = EngineCacheQueue( address=( diff --git a/fastdeploy/engine/engine.py b/fastdeploy/engine/engine.py index fb57f2abe..5aa1041a4 100644 --- a/fastdeploy/engine/engine.py +++ b/fastdeploy/engine/engine.py @@ -709,7 +709,9 @@ class LLMEngine: for i in range(self.cfg.parallel_config.data_parallel_size): request_queues_for_dp_ipc.append(multiprocessing.Queue()) self.engine.scheduler.start( - self.cfg.node_rank * self.cfg.worker_num_per_node, request_queues_for_dp_ipc, result_queue_for_dp_ipc + self.cfg.node_rank * self.cfg.worker_num_per_node % self.cfg.worker_num_per_node, + request_queues_for_dp_ipc, + result_queue_for_dp_ipc, ) if not envs.FD_ENABLE_MULTI_API_SERVER: @@ -721,10 +723,14 @@ class LLMEngine: 1, self.cfg.parallel_config.data_parallel_size // self.cfg.nnode, ): - address = ( - self.cfg.master_ip, - int(self.cfg.parallel_config.engine_worker_queue_port[i]), - ) + if not envs.FD_ENGINE_TASK_QUEUE_WITH_SHM: + address = ( + self.cfg.master_ip, + int(self.cfg.parallel_config.engine_worker_queue_port[i]), + ) + else: + address = f"/dev/shm/fd_task_queue_{self.cfg.parallel_config.engine_worker_queue_port[i]}.sock" + llm_logger.info(f"dp start queue service {address}") self.dp_engine_worker_queue_server.append( EngineWorkerQueue( diff --git a/fastdeploy/engine/request.py b/fastdeploy/engine/request.py index d09066855..8f5a82ff5 100644 --- a/fastdeploy/engine/request.py +++ b/fastdeploy/engine/request.py @@ -161,6 +161,7 @@ class Request: self.extend_block_tables = [] # dp self.dp_rank = dp_rank + self.llm_engine_recv_req_timestamp = time.time() @classmethod def from_dict(cls, d: dict): diff --git a/fastdeploy/envs.py b/fastdeploy/envs.py index 65aa35df1..405940e58 100644 --- a/fastdeploy/envs.py +++ b/fastdeploy/envs.py @@ -155,6 +155,8 @@ environment_variables: dict[str, Callable[[], Any]] = { "ENCODE_FEATURE_BOS_SK": lambda: os.getenv("ENCODE_FEATURE_BOS_SK"), # Enable offline perf test mode for PD disaggregation "FD_OFFLINE_PERF_TEST_FOR_PD": lambda: int(os.getenv("FD_OFFLINE_PERF_TEST_FOR_PD", "0")), + "FD_ENABLE_E2W_TENSOR_CONVERT": lambda: int(os.getenv("FD_ENABLE_E2W_TENSOR_CONVERT", "0")), + "FD_ENGINE_TASK_QUEUE_WITH_SHM": lambda: int(os.getenv("FD_ENGINE_TASK_QUEUE_WITH_SHM", "0")), } diff --git a/fastdeploy/inter_communicator/engine_worker_queue.py b/fastdeploy/inter_communicator/engine_worker_queue.py index be4880e17..42423b6c5 100644 --- a/fastdeploy/inter_communicator/engine_worker_queue.py +++ b/fastdeploy/inter_communicator/engine_worker_queue.py @@ -485,24 +485,38 @@ class EngineWorkerQueue: @staticmethod def to_tensor(tasks): """ - Convert NumPy arrays in multimodal inputs to PaddlePaddle tensors. + Convert NumPy arrays in multimodal inputs to Paddle tensors. Args: - tasks: List of tasks containing multimodal inputs. + tasks (tuple): ([request], bsz) """ + if (not envs.FD_ENABLE_MAX_PREFILL) and (not envs.FD_ENABLE_E2W_TENSOR_CONVERT): + return try: - if envs.FD_ENABLE_MAX_PREFILL: - llm_logger.debug(f"Convert image to tensor, type: {type(tasks)}") - batch_tasks, _ = tasks - for task in batch_tasks: - if not hasattr(task, "multimodal_inputs"): + batch_tasks, _ = tasks + for task in batch_tasks: + multimodal_inputs = getattr(task, "multimodal_inputs", None) + if not multimodal_inputs: + continue + # tensor keys + tensor_keys = [ + "images", + "patch_idx", + "token_type_ids", + "position_ids", + "attention_mask_offset", + ] + + llm_logger.debug(f"Converting multimodal inputs to tensor...{tensor_keys}") + + for key in tensor_keys: + value = multimodal_inputs.get(key) + if value is None: continue - images = task.multimodal_inputs["images"] - if isinstance(images, np.ndarray): - llm_logger.debug(f"Convert image to tensor, shape: {images.shape}") - task.multimodal_inputs["images"] = paddle.to_tensor(images) + if not isinstance(value, paddle.Tensor): + multimodal_inputs[key] = paddle.to_tensor(value) except Exception as e: - llm_logger.warning(f"Failed to convert to tensor: {e}") + llm_logger.warning(f"Tensor conversion failed: {type(e).__name__}: {e}") @staticmethod def to_numpy(tasks): diff --git a/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py b/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py index e4247e0a5..978345a89 100644 --- a/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py +++ b/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py @@ -108,10 +108,22 @@ class MoEMethodBase(QuantMethodBase): # For non-mixed ep phase = config.model_config.moe_phase.phase - if phase == "prefill": - self.ep_prefill_runner = self.EPPrefillRunner(**common_args) + if current_platform.is_cuda(): + if phase == "prefill": + self.ep_prefill_runner = self.EPPrefillRunner( + **common_args, + use_internode_ll_two_stage=layer.fd_config.parallel_config.use_internode_ll_two_stage, + ) + else: + self.ep_decoder_runner = self.EPDecoderRunner( + **common_args, + use_internode_ll_two_stage=layer.fd_config.parallel_config.use_internode_ll_two_stage, + ) else: - self.ep_decoder_runner = self.EPDecoderRunner(**common_args) + if phase == "prefill": + self.ep_prefill_runner = self.EPPrefillRunner(**common_args) + else: + self.ep_decoder_runner = self.EPDecoderRunner(**common_args) def process_loaded_weights(self, layer, weights) -> None: """ diff --git a/fastdeploy/splitwise/splitwise_connector.py b/fastdeploy/splitwise/splitwise_connector.py index ea4022390..a5bbdb6be 100644 --- a/fastdeploy/splitwise/splitwise_connector.py +++ b/fastdeploy/splitwise/splitwise_connector.py @@ -329,8 +329,13 @@ class SplitwiseConnector: Parameters: port (int): Port number. """ + if not envs.FD_ENGINE_TASK_QUEUE_WITH_SHM: + address = ("0.0.0.0", int(port)) + else: + address = f"/dev/shm/fd_task_queue_{port}.sock" + self.connect_innode_instances[port] = EngineWorkerQueue( - address=("0.0.0.0", int(port)), + address=address, num_client=self.cfg.parallel_config.tensor_parallel_size, client_id=0, ) diff --git a/fastdeploy/worker/gpu_model_runner.py b/fastdeploy/worker/gpu_model_runner.py index 5178e2b8c..53583962c 100644 --- a/fastdeploy/worker/gpu_model_runner.py +++ b/fastdeploy/worker/gpu_model_runner.py @@ -569,6 +569,10 @@ class GPUModelRunner(ModelRunnerBase): if request.sampling_params is not None and request.sampling_params.prompt_logprobs is not None: self.prompt_logprobs_reqs[request.request_id] = request has_prefill_task = True + if ( + self.fd_config.scheduler_config.splitwise_role == "decode" + ): # In PD, we continue to decode after P generate first token + self.share_inputs["seq_lens_encoder"][idx : idx + 1] = 0 elif request.task_type.value == RequestType.DECODE.value: # decode task logger.debug(f"Handle decode request {request} at idx {idx}") encoder_block_num = len(request.block_tables) diff --git a/fastdeploy/worker/worker_process.py b/fastdeploy/worker/worker_process.py index 2b9a466e3..f5e92cd1c 100644 --- a/fastdeploy/worker/worker_process.py +++ b/fastdeploy/worker/worker_process.py @@ -554,10 +554,13 @@ class PaddleDisWorkerProc: def start_task_queue_service(self): # Initialize task queue - task_address = ( - self.parallel_config.pod_ip, - self.parallel_config.engine_worker_queue_port, - ) + if not envs.FD_ENGINE_TASK_QUEUE_WITH_SHM: + task_address = ( + self.parallel_config.pod_ip, + self.parallel_config.engine_worker_queue_port, + ) + else: + task_address = f"/dev/shm/fd_task_queue_{self.parallel_config.engine_worker_queue_port}.sock" logger.info(f"connect task queue address {task_address}") self.task_queue = TaskQueue( address=task_address,