[Feature] support get_task with tensor (#4751)

* [Feature] support get_task with tensor

* set FD_ENABLE_E2W_TENSOR_CONVERT default 0
This commit is contained in:
lizhenyun01
2025-11-04 11:00:13 +08:00
committed by GitHub
parent 9835697163
commit d65e00f8fb
2 changed files with 41 additions and 1 deletions

View File

@@ -113,6 +113,7 @@ environment_variables: dict[str, Callable[[], Any]] = {
"FD_ENABLE_RETURN_TEXT": lambda: bool(int(os.getenv("FD_ENABLE_RETURN_TEXT", "0"))),
"ENCODE_FEATURE_BOS_AK": lambda: os.getenv("ENCODE_FEATURE_BOS_AK"),
"ENCODE_FEATURE_BOS_SK": lambda: os.getenv("ENCODE_FEATURE_BOS_SK"),
"FD_ENABLE_E2W_TENSOR_CONVERT": lambda: int(os.getenv("FD_ENABLE_E2W_TENSOR_CONVERT", "0")),
}

View File

@@ -27,7 +27,9 @@ from queue import Queue
from typing import Any, List, Tuple
import numpy as np
import paddle
from fastdeploy import envs
from fastdeploy.utils import llm_logger
@@ -246,7 +248,7 @@ class EngineWorkerQueue:
self.lock.release()
time.sleep(0.001)
self.lock.acquire()
EngineWorkerQueue.to_tensor(tasks)
self.tasks[:] = list()
self.client_read_flag[:] = [0] * self.num_client
self.tasks.append(tasks)
@@ -269,6 +271,43 @@ class EngineWorkerQueue:
self.lock.release()
return tasks, all_client_read
@staticmethod
def to_tensor(tasks):
"""
Convert NumPy arrays in multimodal inputs to Paddle tensors.
Args:
tasks (tuple): ([request], bsz)
"""
if not getattr(envs, "FD_ENABLE_E2W_TENSOR_CONVERT", False):
return
try:
batch_tasks, _ = tasks
for task in batch_tasks:
multimodal_inputs = getattr(task, "multimodal_inputs", None)
if not multimodal_inputs:
continue
# tensor keys
tensor_keys = [
"patch_idx",
"token_type_ids",
"position_ids",
"attention_mask_offset",
]
llm_logger.info(f"Converting multimodal inputs to tensor...{tensor_keys}")
for key in tensor_keys:
value = multimodal_inputs.get(key)
if value is None:
continue
if not isinstance(value, paddle.Tensor):
multimodal_inputs[key] = paddle.to_tensor(value)
except Exception as e:
llm_logger.warning(f"Tensor conversion failed: {type(e).__name__}: {e}")
def num_tasks(self) -> int:
"""
Get current number of tasks in the queue.