[Optim] Improve task-checking performance in engine-worker-queue (#5376)

* [Optim] Optimize costtime in checking tasks in engine-worker-queue

* Update fastdeploy/engine/common_engine.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update fastdeploy/inter_communicator/engine_worker_queue.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* [Docs] Add docstring to set_exist_tasks method (#5382)

* Initial plan

* Add docstring to set_exist_tasks method

Co-authored-by: Jiang-Jia-Jun <163579578+Jiang-Jia-Jun@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: Jiang-Jia-Jun <163579578+Jiang-Jia-Jun@users.noreply.github.com>

* [Docs] Add docstring documentation to exist_tasks() method (#5381)

* Initial plan

* Add comprehensive docstring to exist_tasks() method

Co-authored-by: Jiang-Jia-Jun <163579578+Jiang-Jia-Jun@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: Jiang-Jia-Jun <163579578+Jiang-Jia-Jun@users.noreply.github.com>

* [Optimization] Conditionally initialize shared memory for single-node deployments only (#5383)

* Initial plan

* Conditionally initialize exist_tasks_intra_signal for single-node deployments

Co-authored-by: Jiang-Jia-Jun <163579578+Jiang-Jia-Jun@users.noreply.github.com>

* Use is_single_node flag for consistent deployment type checking

Co-authored-by: Jiang-Jia-Jun <163579578+Jiang-Jia-Jun@users.noreply.github.com>

* Remove redundant None checks in exist_tasks methods

Co-authored-by: Jiang-Jia-Jun <163579578+Jiang-Jia-Jun@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: Jiang-Jia-Jun <163579578+Jiang-Jia-Jun@users.noreply.github.com>

* format code

---------

Co-authored-by: Jiang-Jia-Jun <jiangjiajun@baidu.com>
Co-authored-by: YuBaoku <49938469+EmmonsCurse@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <198982749+Copilot@users.noreply.github.com>
This commit is contained in:
Jiang-Jia-Jun
2025-12-11 10:33:32 +08:00
committed by GitHub
parent 2ec76352da
commit 4b3e41c665
3 changed files with 67 additions and 2 deletions

View File

@@ -700,7 +700,7 @@ class EngineService:
if self.resource_manager.available_batch() == 0:
time.sleep(0.001)
continue
if self.engine_worker_queue.num_tasks() > 0:
if self.engine_worker_queue.exist_tasks():
time.sleep(0.001)
continue
if hasattr(self, "exist_prefill_task_signal") and self.exist_prefill_task_signal.value[0] > 0:

View File

@@ -29,6 +29,7 @@ from typing import Any, List, Tuple
import numpy as np
from fastdeploy import envs
from fastdeploy.inter_communicator.ipc_signal import IPCSignal
from fastdeploy.utils import llm_logger, to_tensor
@@ -65,6 +66,8 @@ class EngineWorkerQueue:
self.client_id: int = client_id
self.local_data_parallel_size = local_data_parallel_size
self.local_data_parallel_id = local_data_parallel_id
# Store whether this is a single-node deployment for consistent checking
self.is_single_node: bool = address[0] == "0.0.0.0"
class QueueManager(BaseManager):
"""
@@ -82,6 +85,9 @@ class EngineWorkerQueue:
self.lock_init: List[threading.Lock] = [threading.Lock() for _ in range(self.local_data_parallel_size)]
self.read_finish_flag_init: List[Value] = [Value("i", 0) for _ in range(self.local_data_parallel_size)]
self.exist_tasks_inter_signal_init: List[Value] = [
Value("i", 0) for _ in range(self.local_data_parallel_size)
]
self.connected_client_counter_init: List[Value] = [
Value("i", 0) for _ in range(self.local_data_parallel_size)
]
@@ -202,6 +208,11 @@ class EngineWorkerQueue:
callable=lambda idx: self.read_finish_flag_init[idx],
proxytype=ValueProxy,
)
QueueManager.register(
"get_exist_tasks_inter_signal",
callable=lambda idx: self.exist_tasks_inter_signal_init[idx],
proxytype=ValueProxy,
)
QueueManager.register(
"get_can_put_next_connect_task_response_flag",
callable=lambda idx: self.can_put_next_connect_task_response_flag_init[idx],
@@ -337,6 +348,7 @@ class EngineWorkerQueue:
QueueManager.register("get_client_read_flag")
QueueManager.register("get_lock")
QueueManager.register("get_read_finish_flag")
QueueManager.register("get_exist_tasks_inter_signal")
QueueManager.register("get_connected_client_counter")
QueueManager.register("get_finish_request_queue")
QueueManager.register("get_finish_add_cache_task_queue")
@@ -373,6 +385,9 @@ class EngineWorkerQueue:
self.client_read_flag: ListProxy = self.manager.get_client_read_flag(self.local_data_parallel_id)
self.lock: AcquirerProxy = self.manager.get_lock(self.local_data_parallel_id)
self.read_finish_flag: ValueProxy = self.manager.get_read_finish_flag(self.local_data_parallel_id)
self.exist_tasks_inter_signal: ValueProxy = self.manager.get_exist_tasks_inter_signal(
self.local_data_parallel_id
)
self.connected_client_counter: ValueProxy = self.manager.get_connected_client_counter(
self.local_data_parallel_id
)
@@ -433,6 +448,19 @@ class EngineWorkerQueue:
assert self.num_client == len(self.client_read_flag)
# Only initialize shared memory for single-node deployments
if self.is_single_node:
exist_tasks_intra_signal_data = np.zeros([1], dtype=np.int32)
self.exist_tasks_intra_signal = IPCSignal(
name="exist_tasks_intra_signal",
array=exist_tasks_intra_signal_data,
dtype=np.int32,
suffix=self.get_server_port() if is_server else address[1],
create=is_server,
)
else:
self.exist_tasks_intra_signal = None
if is_server:
llm_logger.info("EngineWorkerQueue server started.")
else:
@@ -454,6 +482,41 @@ class EngineWorkerQueue:
raise RuntimeError("Only the server instance can provide the port.")
return self.address[1]
def exist_tasks(self) -> bool:
"""
Check if there are tasks in the queue without acquiring lock.
For single-node deployments (address="0.0.0.0"), uses shared memory signal (faster).
For multi-node deployments, uses inter-process communication.
This method is more efficient than num_tasks() when you only need to know
whether tasks exist, as it doesn't require acquiring a lock.
Returns:
bool: True if tasks exist in the queue, False otherwise.
"""
if self.is_single_node:
return self.exist_tasks_intra_signal.value[0] == 1
else:
return self.exist_tasks_inter_signal.get() == 1
def set_exist_tasks(self, flag: bool) -> None:
"""
Set the task existence flag to indicate whether tasks are available in the queue.
This method updates a shared signal that is checked by workers to determine if
tasks are available for processing. It is called when tasks are added to the queue
(set to True) or when all clients have read the tasks (set to False).
Args:
flag: True to indicate tasks exist in the queue, False to indicate no tasks.
"""
value = 1 if flag else 0
if self.is_single_node:
self.exist_tasks_intra_signal.value[0] = value
else:
self.exist_tasks_inter_signal.set(value)
def _connect_with_retry(self, max_retries: int = 5, interval: int = 3) -> None:
"""
Connect to the server with retry mechanism.
@@ -494,6 +557,7 @@ class EngineWorkerQueue:
self.tasks[:] = list()
self.client_read_flag[:] = [0] * self.num_client
self.tasks.append(tasks)
self.set_exist_tasks(True)
self.lock.release()
llm_logger.debug(f"put_tasks: tasks={tasks}")
@@ -513,6 +577,7 @@ class EngineWorkerQueue:
all_client_read: bool = np.sum(self.client_read_flag) == self.num_client
if all_client_read:
self.tasks[:] = list()
self.set_exist_tasks(False)
self.lock.release()
llm_logger.debug(f"get_tasks: tasks={tasks}")
return tasks, all_client_read

View File

@@ -441,7 +441,7 @@ class PaddleDisWorkerProc:
# The first worker detects whether there are tasks in the task queue
if tp_rank == 0:
if self.task_queue.num_tasks() > 0:
if self.task_queue.exist_tasks():
if envs.ENABLE_V1_KVCACHE_SCHEDULER or not (
self.fd_config.model_config.enable_mm and self.worker.exist_prefill()
):