diff --git a/fastdeploy/engine/common_engine.py b/fastdeploy/engine/common_engine.py index fde2ba204..45f7bf7f2 100644 --- a/fastdeploy/engine/common_engine.py +++ b/fastdeploy/engine/common_engine.py @@ -700,7 +700,7 @@ class EngineService: if self.resource_manager.available_batch() == 0: time.sleep(0.001) continue - if self.engine_worker_queue.num_tasks() > 0: + if self.engine_worker_queue.exist_tasks(): time.sleep(0.001) continue if hasattr(self, "exist_prefill_task_signal") and self.exist_prefill_task_signal.value[0] > 0: diff --git a/fastdeploy/inter_communicator/engine_worker_queue.py b/fastdeploy/inter_communicator/engine_worker_queue.py index 1481379e5..7489b3ea0 100644 --- a/fastdeploy/inter_communicator/engine_worker_queue.py +++ b/fastdeploy/inter_communicator/engine_worker_queue.py @@ -29,6 +29,7 @@ from typing import Any, List, Tuple import numpy as np from fastdeploy import envs +from fastdeploy.inter_communicator.ipc_signal import IPCSignal from fastdeploy.utils import llm_logger, to_tensor @@ -65,6 +66,8 @@ class EngineWorkerQueue: self.client_id: int = client_id self.local_data_parallel_size = local_data_parallel_size self.local_data_parallel_id = local_data_parallel_id + # Store whether this is a single-node deployment for consistent checking + self.is_single_node: bool = address[0] == "0.0.0.0" class QueueManager(BaseManager): """ @@ -82,6 +85,9 @@ class EngineWorkerQueue: self.lock_init: List[threading.Lock] = [threading.Lock() for _ in range(self.local_data_parallel_size)] self.read_finish_flag_init: List[Value] = [Value("i", 0) for _ in range(self.local_data_parallel_size)] + self.exist_tasks_inter_signal_init: List[Value] = [ + Value("i", 0) for _ in range(self.local_data_parallel_size) + ] self.connected_client_counter_init: List[Value] = [ Value("i", 0) for _ in range(self.local_data_parallel_size) ] @@ -202,6 +208,11 @@ class EngineWorkerQueue: callable=lambda idx: self.read_finish_flag_init[idx], proxytype=ValueProxy, ) + QueueManager.register( + "get_exist_tasks_inter_signal", + callable=lambda idx: self.exist_tasks_inter_signal_init[idx], + proxytype=ValueProxy, + ) QueueManager.register( "get_can_put_next_connect_task_response_flag", callable=lambda idx: self.can_put_next_connect_task_response_flag_init[idx], @@ -337,6 +348,7 @@ class EngineWorkerQueue: QueueManager.register("get_client_read_flag") QueueManager.register("get_lock") QueueManager.register("get_read_finish_flag") + QueueManager.register("get_exist_tasks_inter_signal") QueueManager.register("get_connected_client_counter") QueueManager.register("get_finish_request_queue") QueueManager.register("get_finish_add_cache_task_queue") @@ -373,6 +385,9 @@ class EngineWorkerQueue: self.client_read_flag: ListProxy = self.manager.get_client_read_flag(self.local_data_parallel_id) self.lock: AcquirerProxy = self.manager.get_lock(self.local_data_parallel_id) self.read_finish_flag: ValueProxy = self.manager.get_read_finish_flag(self.local_data_parallel_id) + self.exist_tasks_inter_signal: ValueProxy = self.manager.get_exist_tasks_inter_signal( + self.local_data_parallel_id + ) self.connected_client_counter: ValueProxy = self.manager.get_connected_client_counter( self.local_data_parallel_id ) @@ -433,6 +448,19 @@ class EngineWorkerQueue: assert self.num_client == len(self.client_read_flag) + # Only initialize shared memory for single-node deployments + if self.is_single_node: + exist_tasks_intra_signal_data = np.zeros([1], dtype=np.int32) + self.exist_tasks_intra_signal = IPCSignal( + name="exist_tasks_intra_signal", + array=exist_tasks_intra_signal_data, + dtype=np.int32, + suffix=self.get_server_port() if is_server else address[1], + create=is_server, + ) + else: + self.exist_tasks_intra_signal = None + if is_server: llm_logger.info("EngineWorkerQueue server started.") else: @@ -454,6 +482,41 @@ class EngineWorkerQueue: raise RuntimeError("Only the server instance can provide the port.") return self.address[1] + def exist_tasks(self) -> bool: + """ + Check if there are tasks in the queue without acquiring lock. + + For single-node deployments (address="0.0.0.0"), uses shared memory signal (faster). + For multi-node deployments, uses inter-process communication. + + This method is more efficient than num_tasks() when you only need to know + whether tasks exist, as it doesn't require acquiring a lock. + + Returns: + bool: True if tasks exist in the queue, False otherwise. + """ + if self.is_single_node: + return self.exist_tasks_intra_signal.value[0] == 1 + else: + return self.exist_tasks_inter_signal.get() == 1 + + def set_exist_tasks(self, flag: bool) -> None: + """ + Set the task existence flag to indicate whether tasks are available in the queue. + + This method updates a shared signal that is checked by workers to determine if + tasks are available for processing. It is called when tasks are added to the queue + (set to True) or when all clients have read the tasks (set to False). + + Args: + flag: True to indicate tasks exist in the queue, False to indicate no tasks. + """ + value = 1 if flag else 0 + if self.is_single_node: + self.exist_tasks_intra_signal.value[0] = value + else: + self.exist_tasks_inter_signal.set(value) + def _connect_with_retry(self, max_retries: int = 5, interval: int = 3) -> None: """ Connect to the server with retry mechanism. @@ -494,6 +557,7 @@ class EngineWorkerQueue: self.tasks[:] = list() self.client_read_flag[:] = [0] * self.num_client self.tasks.append(tasks) + self.set_exist_tasks(True) self.lock.release() llm_logger.debug(f"put_tasks: tasks={tasks}") @@ -513,6 +577,7 @@ class EngineWorkerQueue: all_client_read: bool = np.sum(self.client_read_flag) == self.num_client if all_client_read: self.tasks[:] = list() + self.set_exist_tasks(False) self.lock.release() llm_logger.debug(f"get_tasks: tasks={tasks}") return tasks, all_client_read diff --git a/fastdeploy/worker/worker_process.py b/fastdeploy/worker/worker_process.py index 980f0c1b3..8057648f1 100644 --- a/fastdeploy/worker/worker_process.py +++ b/fastdeploy/worker/worker_process.py @@ -441,7 +441,7 @@ class PaddleDisWorkerProc: # The first worker detects whether there are tasks in the task queue if tp_rank == 0: - if self.task_queue.num_tasks() > 0: + if self.task_queue.exist_tasks(): if envs.ENABLE_V1_KVCACHE_SCHEDULER or not ( self.fd_config.model_config.enable_mm and self.worker.exist_prefill() ):