[Engine] [Feature] Refactor async_llm:cross-process with EngineService,based on zmq communication (#4868)

* Refactor async_llm:cross-process with EngineService

* fix: async_llm output process

* fix: return prompt_token_ids and prompt_tokens in first res

* optimize common_engine start func
This commit is contained in:
zhouchong
2025-12-09 10:53:40 +08:00
committed by GitHub
parent 2f208db4e9
commit 5d9b5e4a5b
8 changed files with 2217 additions and 1790 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -17,7 +17,13 @@
from __future__ import annotations
import copy
import json
import multiprocessing
import os
import re
import signal
import subprocess
import sys
import threading
import time
import traceback
@@ -30,6 +36,7 @@ import paddle
import requests
import zmq
from opentelemetry import trace
from tqdm import tqdm
from fastdeploy.engine.request import Request, RequestOutput, RequestType
from fastdeploy.engine.resource_manager import ResourceManager
@@ -66,7 +73,7 @@ class EngineService:
Base class containing common engine functionality
"""
def __init__(self, cfg, start_queue=True):
def __init__(self, cfg, start_queue=True, use_async_llm=False):
"""
Initializes the LLMEngine with the provided configuration.
@@ -74,6 +81,7 @@ class EngineService:
cfg (Config): Config object containing all the configuration parameters.
"""
self.cfg = cfg
self.use_async_llm = use_async_llm
if cfg.scheduler_config.splitwise_role != "mixed" or cfg.cache_config.enable_prefix_caching:
if isinstance(self.cfg.cache_config.cache_queue_port, str):
self.cfg.cache_config.cache_queue_port = self.cfg.cache_config.cache_queue_port.split(",")
@@ -149,10 +157,21 @@ class EngineService:
)
init_eplb_signals(cfg, current_suffix)
if self.use_async_llm:
# Add worker management attributes
self.worker_proc = None
self.do_profile = 1 if self.cfg.cache_config.num_gpu_blocks_override is None else 0
self.ipc_signal_suffix = None
self.cache_manager_processes = None
self._finalizer = weakref.finalize(self, self._exit_sub_services)
def start(self):
def start(self, async_llm_pid=None):
self.running = True
if self.use_async_llm:
self.start_worker_service(async_llm_pid)
if envs.ENABLE_V1_KVCACHE_SCHEDULER:
self.insert_task_to_worker_thread = threading.Thread(
target=self._schedule_request_to_worker_v1, daemon=True
@@ -167,6 +186,69 @@ class EngineService:
self._register_to_router()
def start_worker_service(self, async_llm_pid=None):
# Initialize IPC signals for worker management
self.ipc_signal_suffix = self.cfg.parallel_config.engine_worker_queue_port[0]
self._init_worker_signals()
# Create data processor if not exists
if not hasattr(self, "data_processor"):
self.create_data_processor()
# Launch components: scheduler, cache_manager, expert_service et.al.
self.launch_components()
# If block number is specified and model is deployed in splitwise mode, start cache manager first
if not self.do_profile and self.cfg.scheduler_config.splitwise_role != "mixed":
device_ids = self.cfg.parallel_config.device_ids.split(",")
self.cache_manager_processes = self.start_cache_service(device_ids, self.ipc_signal_suffix)
# Start worker processes
self.worker_proc = self._start_worker_service()
time.sleep(5)
self.worker_init_status = dict()
result_container = {}
def check_worker_initialize_status_func(res: dict):
res["worker_is_alive"] = True
if not self.check_worker_initialize_status():
llm_logger.error("Failed to launch worker processes, check log/workerlog.* for more details.")
res["worker_is_alive"] = False
self.check_worker_initialize_status_func_thread = threading.Thread(
target=check_worker_initialize_status_func, args=(result_container,), daemon=True
)
self.check_worker_initialize_status_func_thread.start()
# Wait model loading
while self.loaded_model_signal.value[0] == 0:
# Make sure worker process is alive
if not self.check_worker_initialize_status_func_thread.is_alive():
return False
time.sleep(1)
# If block number is not specified, let workers do profiling to determine the block number,
# and then start the cache manager
if self.do_profile:
self._stop_profile()
elif self.cfg.scheduler_config.splitwise_role == "mixed" and self.cfg.cache_config.enable_prefix_caching:
device_ids = self.cfg.parallel_config.device_ids.split(",")
self.cache_manager_processes = self.start_cache_service(device_ids, self.ipc_signal_suffix)
# Set cache manager signal
if self.cfg.scheduler_config.splitwise_role != "mixed":
self.launched_cache_manager_signal.value[0] = 1
# Worker launched
self.check_worker_initialize_status_func_thread.join()
if not result_container["worker_is_alive"]:
llm_logger.error("Failed to launch worker processes, check log/workerlog.* for more details.")
return False
# Start ZMQ service for communication with AsyncLLM
if async_llm_pid:
self.start_zmq_service(async_llm_pid)
def create_data_processor(self):
self.input_processor = InputPreprocessor(
self.cfg.model_config,
@@ -970,7 +1052,13 @@ class EngineService:
else:
err, data = self.recv_request_server.receive_pyobj_once(block)
if err is not None:
self.llm_logger.error(f"Engine stops inserting zmq task into scheduler, err:{err}")
# The message "Context was terminated" is normal when closing a ZMQ context
if "Context was terminated" in str(err):
self.llm_logger.info(
"Engine stops inserting zmq task into scheduler due to ZMQ context termination (normal shutdown)."
)
else:
self.llm_logger.error(f"Engine stops inserting zmq task into scheduler, err:{err}")
break
request, insert_task = None, []
@@ -1336,6 +1424,58 @@ class EngineService:
"""
llm_logger.info("Exit sub services.....")
self.running = False
if self.use_async_llm:
# Clean up worker processes first (before closing multiprocessing services)
if hasattr(self, "worker_proc") and self.worker_proc is not None:
llm_logger.info("Cleaning up worker processes...")
try:
pgid = os.getpgid(self.worker_proc.pid)
os.killpg(pgid, signal.SIGTERM)
except Exception as e:
llm_logger.error(f"Error extracting sub services: {e}, {str(traceback.format_exc())}")
# Clean up cache manager processes
if hasattr(self, "cache_manager_processes"):
llm_logger.info("Cleaning up cache manager processes...")
self.resource_manager.cache_manager.shm_cache_task_flag_broadcast.clear()
self.resource_manager.cache_manager.cache_ready_signal.clear()
for p in self.cache_manager_processes:
llm_logger.info(f"Killing cache manager process {p.pid}")
try:
pgid = os.getpgid(p.pid)
os.killpg(pgid, signal.SIGTERM)
except Exception as e:
llm_logger.error(
f"Error killing cache manager process {p.pid}: {e}, {str(traceback.format_exc())}"
)
if hasattr(self, "cache_task_queue") and self.cache_task_queue is not None:
llm_logger.info("Cleaning up cache_task_queue...")
# Check if cleanup method exists
if hasattr(self.cache_task_queue, "cleanup"):
self.cache_task_queue.cleanup()
elif hasattr(self.cache_task_queue, "manager"):
try:
llm_logger.info("Shutting down cache_task_queue manager...")
self.cache_task_queue.manager.shutdown()
except Exception as e:
llm_logger.warning(f"Error shutting down cache_task_queue manager: {e}")
if hasattr(self, "get_profile_block_num_signal"):
self.get_profile_block_num_signal.clear()
self.worker_ready_signal.clear()
self.loaded_model_signal.clear()
# Clean up other services
if hasattr(self, "dp_processed"):
for p in self.dp_processed:
llm_logger.info(f"Waiting for worker {p.pid} to exit")
p.join()
for p in self.dp_engine_worker_queue_server:
p.cleanup()
if hasattr(self, "engine_worker_queue_server") and self.engine_worker_queue_server is not None:
self.engine_worker_queue_server.cleanup()
self.exist_task_signal.clear()
@@ -1353,3 +1493,395 @@ class EngineService:
self.recv_request_server.close()
if hasattr(self, "recv_control_cmd_server") and self.recv_control_cmd_server is not None:
self.recv_control_cmd_server.close()
# 从 async_llm 移到 common_engine
def _worker_processes_ready(self):
"""
judge if all worker processes are ready
"""
if np.sum(self.worker_ready_signal.value) == self.cfg.worker_num_per_node:
return True
return False
def _init_worker_signals(self):
"""
Initialize shared memory to indicate engine status
"""
# worker_ready_signal 用于worker进程感知engine是否启动完成
worker_ready_signal_data = np.zeros(shape=[self.cfg.worker_num_per_node], dtype=np.int32)
self.worker_ready_signal = IPCSignal(
name="worker_ready_signal",
array=worker_ready_signal_data,
dtype=np.int32,
suffix=self.ipc_signal_suffix,
create=True,
)
# launched_cache_manager_signal 用于感知engine是否启动了cache_manager
if self.cfg.cache_config.enable_prefix_caching or self.cfg.scheduler_config.splitwise_role != "mixed":
launched_cache_manager_signal_data = np.zeros([1], dtype=np.int32)
self.launched_cache_manager_signal = IPCSignal(
name="launched_cache_manager_signal",
array=launched_cache_manager_signal_data,
dtype=np.int32,
suffix=self.ipc_signal_suffix,
create=True,
)
# launched_expert_service_signal: Used to sense whether each expet_servic is started successfully
if self.cfg.parallel_config.enable_expert_parallel and self.cfg.parallel_config.data_parallel_size > 1:
launched_expert_service_signal_data = np.zeros(
shape=[self.cfg.parallel_config.data_parallel_size // self.cfg.nnode], dtype=np.int32
)
self.launched_expert_service_signal = IPCSignal(
name="launched_expert_service_signal",
array=launched_expert_service_signal_data,
dtype=np.int32,
suffix=self.ipc_signal_suffix,
create=True,
)
# loaded_model_signal: Used to detect whether each worker has completed model loading
loaded_model_signal_data = np.zeros([1], dtype=np.int32)
self.loaded_model_signal = IPCSignal(
name="loaded_model_signal",
array=loaded_model_signal_data,
dtype=np.int32,
suffix=self.ipc_signal_suffix,
create=True,
)
if self.do_profile:
if paddle.is_compiled_with_custom_device("iluvatar_gpu"):
get_profile_block_num = np.zeros([self.cfg.worker_num_per_node], dtype=np.int32)
else:
get_profile_block_num = np.zeros([1], dtype=np.int32)
self.get_profile_block_num_signal = IPCSignal(
name="get_profile_block_num",
array=get_profile_block_num,
dtype=np.int32,
suffix=self.ipc_signal_suffix,
create=True,
)
def _setting_environ_variables(self):
"""
配置环境变量
"""
variables = {
"ENABLE_FASTDEPLOY_LOAD_MODEL_CONCURRENCY": 0,
"LOAD_STATE_DICT_THREAD_NUM": len(self.cfg.parallel_config.device_ids.split(",")),
"PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION": "python",
"FLAGS_use_append_attn": 1,
"NCCL_ALGO": "Ring",
"FLAGS_max_partition_size": int(os.getenv("FLAGS_max_partition_size", 1024)),
"OMP_NUM_THREADS": 3,
}
# environment variables needed by Dy2St
variables.update(
{
"SOT_LOG_LEVEL": os.getenv("SOT_LOG_LEVEL", default="0"),
"SOT_UNSAFE_CACHE_FASTPATH": os.getenv("SOT_UNSAFE_CACHE_FASTPATH", default="1"),
"SOT_ENABLE_0_SIZE_FALLBACK": os.getenv("SOT_ENABLE_0_SIZE_FALLBACK", default="0"),
"SOT_SPECIALIZED_DIM_NUMBERS": os.getenv("SOT_SPECIALIZED_DIM_NUMBERS", default="no"),
"FLAGS_specialize_device_in_dy2st": os.getenv("FLAGS_specialize_device_in_dy2st", default="1"),
"FLAGS_enable_async_fast_gc": os.getenv("FLAGS_enable_async_fast_gc", default="0"),
"FLAGS_pir_interpreter_record_stream_for_gc_cache": os.getenv(
"FLAGS_pir_interpreter_record_stream_for_gc_cache", default="1"
),
"FLAGS_parameters_persistent_mode_in_dy2st": os.getenv(
"FLAGS_parameters_persistent_mode_in_dy2st", default="1"
),
}
)
if self.cfg.scheduler_config.splitwise_role != "mixed":
if envs.ENABLE_V1_KVCACHE_SCHEDULER:
variables["FLAGS_use_pd_disaggregation_per_chunk"] = 1
else:
variables["FLAGS_use_pd_disaggregation"] = 1
# TODO dynamic load environment variable
if self.cfg.scheduler_config.splitwise_role == "prefill":
variables["FLAGS_fmt_write_cache_completed_signal"] = 1
if self.cfg.model_config.enable_mm:
variables["FLAGS_max_partition_size"] = 1024
command_prefix = ""
for k, v in variables.items():
command_prefix += f"{k}={v} "
return command_prefix
def _start_worker_service(self):
"""
start gpu worker service
"""
log_dir = os.getenv("FD_LOG_DIR", default="log")
command_prefix = self._setting_environ_variables()
current_file_path = os.path.abspath(__file__)
current_dir_path = os.path.split(current_file_path)[0]
# TODO
uncache_worker_stdout = "" if os.getenv("UNCACHE_WORKER_STDOUT", "0") == "1" else "-u"
pd_cmd = f"{command_prefix} {sys.executable} {uncache_worker_stdout} -m paddle.distributed.launch"
pd_cmd = pd_cmd + f" --log_dir {log_dir}"
worker_path = "../worker/worker_process.py"
py_script = os.path.join(current_dir_path, worker_path)
ori_vocab_size = (
len(self.data_processor.tokenizer.sp_model)
if hasattr(self.data_processor.tokenizer, "sp_model")
else len(self.data_processor.tokenizer.vocab)
)
think_end_id = self.data_processor.tokenizer.get_vocab().get("</think>", -1)
if think_end_id > 0:
llm_logger.info(f"Get think_end_id {think_end_id} from vocab.")
else:
llm_logger.info("No </think> token found in vocabulary, the model can not do reasoning.")
image_patch_id = self.data_processor.tokenizer.get_vocab().get("<|IMAGE_PLACEHOLDER|>", -1)
line_break_id = self.data_processor.tokenizer.get_vocab().get("\n", -1)
ports = ",".join(self.cfg.parallel_config.engine_worker_queue_port)
ips = None
if self.cfg.ips is not None:
ips = ",".join(self.cfg.ips)
arguments = (
f" --devices {self.cfg.parallel_config.device_ids} {py_script}"
f" --max_num_seqs {self.cfg.scheduler_config.max_num_seqs} --max_model_len {self.cfg.model_config.max_model_len}"
f" --gpu_memory_utilization {self.cfg.cache_config.gpu_memory_utilization}"
f" --model {self.cfg.model_config.model!s}"
f" --device_ids {self.cfg.parallel_config.device_ids}"
f" --tensor_parallel_size {self.cfg.parallel_config.tensor_parallel_size}"
f" --engine_worker_queue_port {ports}"
f" --pod_ip {self.cfg.master_ip}"
f" --block_size {self.cfg.cache_config.block_size}"
f" --enc_dec_block_num {self.cfg.cache_config.enc_dec_block_num}"
f" --eos_tokens_lens {self.data_processor.eos_token_id_len}"
f" --pad_token_id {self.data_processor.pad_token_id}"
f" --engine_pid {self.cfg.parallel_config.engine_worker_queue_port[0]}"
f" --max_num_batched_tokens {self.cfg.scheduler_config.max_num_batched_tokens}"
f" --splitwise_role {self.cfg.scheduler_config.splitwise_role}"
f" --kv_cache_ratio {self.cfg.cache_config.kv_cache_ratio}"
f" --expert_parallel_size {self.cfg.parallel_config.expert_parallel_size}"
f" --chunked_moe_size {self.cfg.parallel_config.chunked_moe_size}"
f" --data_parallel_size {self.cfg.parallel_config.data_parallel_size}"
f" --quantization '{json.dumps(self.cfg.model_config.quantization)}'"
f" --ori_vocab_size {ori_vocab_size}"
f" --think_end_id {think_end_id}"
f" --image_patch_id {image_patch_id}"
f" --line_break_id {line_break_id}"
f" --speculative_config '{self.cfg.speculative_config.to_json_string()}'"
f" --graph_optimization_config '{self.cfg.graph_opt_config.to_json_string()}'"
f" --guided_decoding_backend {self.cfg.structured_outputs_config.guided_decoding_backend}"
f" --load_strategy {self.cfg.load_config.load_strategy}"
f" --early_stop_config '{self.cfg.early_stop_config.to_json_string()}'"
f" --reasoning_parser {self.cfg.structured_outputs_config.reasoning_parser}"
f" --load_choices {self.cfg.load_config.load_choices}"
f" --plas_attention_config '{self.cfg.plas_attention_config.to_json_string()}'"
f" --ips {ips}"
f" --cache-transfer-protocol {self.cfg.cache_config.cache_transfer_protocol}"
f" --runner {self.cfg.model_config.runner}"
f" --convert {self.cfg.model_config.convert}"
f" --override-pooler-config {self.cfg.model_config.override_pooler_config}"
f" --logprobs_mode {self.cfg.model_config.logprobs_mode}"
f" --max_logprobs {self.cfg.model_config.max_logprobs}"
f" --eplb_config '{self.cfg.eplb_config.to_json_string()}'"
)
if self.cfg.structured_outputs_config.logits_processors is not None:
arguments += f" --logits-processors {' '.join(self.cfg.structured_outputs_config.logits_processors)}"
worker_store_true_flag = {
"enable_expert_parallel": self.cfg.parallel_config.enable_expert_parallel,
"enable_prefix_caching": self.cfg.cache_config.enable_prefix_caching,
"enable_chunked_prefill": self.cfg.cache_config.enable_chunked_prefill,
"do_profile": self.do_profile,
"dynamic_load_weight": self.cfg.load_config.dynamic_load_weight,
"disable_any_whitespace": self.cfg.structured_outputs_config.disable_any_whitespace,
"disable_custom_all_reduce": self.cfg.parallel_config.disable_custom_all_reduce,
"use_internode_ll_two_stage": self.cfg.parallel_config.use_internode_ll_two_stage,
"disable_sequence_parallel_moe": self.cfg.parallel_config.disable_sequence_parallel_moe,
"enable_logprob": self.cfg.model_config.enable_logprob,
"lm_head_fp32": self.cfg.model_config.lm_head_fp32,
}
for worker_flag, value in worker_store_true_flag.items():
if value:
arguments = arguments + f" --{worker_flag}"
worker_default_none_flag = {
"num_gpu_blocks_override": self.cfg.cache_config.num_gpu_blocks_override,
}
for worker_flag, value in worker_default_none_flag.items():
if value:
arguments = arguments + f" --{worker_flag} {value}"
if self.cfg.nnode > 1:
pd_cmd = pd_cmd + f" --ips {ips} --nnodes {len(self.cfg.ips)}"
pd_cmd = pd_cmd + arguments + f" 2>{log_dir}/launch_worker.log"
llm_logger.info(f"Launch worker service command: {pd_cmd}")
p = subprocess.Popen(
pd_cmd,
stdout=subprocess.PIPE,
shell=True,
preexec_fn=os.setsid,
)
return p
def _stop_profile(self):
"""
Stop profiling of the model server and reset variables.
"""
self.do_profile = 0
while self.get_profile_block_num_signal.value[0] == 0:
time.sleep(1)
num_gpu_blocks = self.get_profile_block_num_signal.value[0]
self.cfg.cache_config.reset(num_gpu_blocks)
self.resource_manager.reset_cache_config(self.cfg.cache_config)
if self.cfg.cache_config.enable_prefix_caching or self.cfg.scheduler_config.splitwise_role != "mixed":
device_ids = self.cfg.parallel_config.device_ids.split(",")
self.cache_manager_processes = self.start_cache_service(device_ids, self.ipc_signal_suffix)
def check_health(self, time_interval_threashold=30):
"""
Check the health of the model server by checking whether all workers are alive.
"""
if self.worker_healthy_live_signal.value[0]:
elapsed_time = time.time() - self.worker_healthy_live_signal.value[0]
if elapsed_time > time_interval_threashold:
return False, "Worker Service Not Healthy"
return True, ""
def launch_components(self):
if self.cfg.scheduler_config.splitwise_role != "mixed":
# 单机逻辑
self.splitwise_receive_thread = threading.Thread(target=self.split_connector.start_receiver, args=())
self.splitwise_receive_thread.daemon = True
self.splitwise_receive_thread.start()
role = self.cfg.scheduler_config.splitwise_role
host_ip = self.cfg.host_ip
disaggregate = self.cfg.disaggregate_info
request_queues_for_dp_ipc = None
result_queue_for_dp_ipc = None
if self.cfg.scheduler_config.name == "splitwise":
self.scheduler.start(role, host_ip, disaggregate)
elif self.cfg.scheduler_config.name == "dp":
request_queues_for_dp_ipc = []
result_queue_for_dp_ipc = multiprocessing.Queue()
for i in range(self.cfg.parallel_config.data_parallel_size):
request_queues_for_dp_ipc.append(multiprocessing.Queue())
self.scheduler.start(
self.cfg.node_rank * self.cfg.worker_num_per_node % self.cfg.worker_num_per_node,
request_queues_for_dp_ipc,
result_queue_for_dp_ipc,
)
if not envs.FD_ENABLE_MULTI_API_SERVER:
if self.cfg.parallel_config.enable_expert_parallel and self.cfg.parallel_config.data_parallel_size > 1:
self.launched_expert_service_signal.value[0] = 1
self.dp_processed = []
self.dp_engine_worker_queue_server = []
for i in range(
1,
self.cfg.parallel_config.data_parallel_size // self.cfg.nnode,
):
if not envs.FD_ENGINE_TASK_QUEUE_WITH_SHM:
address = (
self.cfg.master_ip,
int(self.cfg.parallel_config.engine_worker_queue_port[i]),
)
else:
address = f"/dev/shm/fd_task_queue_{self.cfg.parallel_config.engine_worker_queue_port[i]}.sock"
llm_logger.info(f"dp start queue service {address}")
self.dp_engine_worker_queue_server.append(
EngineWorkerQueue(
address=address,
is_server=True,
num_client=self.cfg.parallel_config.tensor_parallel_size,
local_data_parallel_size=self.cfg.parallel_config.data_parallel_size,
)
)
from fastdeploy.engine.expert_service import (
start_data_parallel_service,
)
self.dp_processed.append(
multiprocessing.Process(
target=start_data_parallel_service,
args=(
self.cfg,
i,
),
)
)
llm_logger.info(
f"Engine is initialized successfully with {self.cfg.parallel_config.tensor_parallel_size}"
+ f" data parallel id {i}"
)
self.dp_processed[-1].start()
while self.launched_expert_service_signal.value[i] == 0:
time.sleep(1)
def check_worker_initialize_status(self):
"""
Check the initlialize status of workers by stdout logging
"""
def detect_thread():
for line in self.worker_proc.stdout:
line = line.decode("utf-8", errors="ignore")
if self.worker_init_status.get("finished", False):
break
if match := re.search(
r"Loading (?:fastsafetensors |safetensors )?checkpoint shards:\s*(\d+)",
line,
):
self.worker_init_status["weight_loadding"] = eval(match.group(1)) * 1.0 / 100
elif (match := re.search(r"Start load layer (\d+)", line)) or (
match := re.search(r"set state for layer (\d+)", line)
):
progress = eval(match.group(1)) * 1.0 / self.cfg.model_config.num_hidden_layers
self.worker_init_status["layer_loadding"] = progress
if self.worker_init_status["layer_loadding"] == self.cfg.model_config.num_hidden_layers - 1:
self.worker_init_status["finished"] = True
self.checking_worker_status_thread = threading.Thread(target=detect_thread, daemon=True)
self.checking_worker_status_thread.start()
# display weight loadding progress
with tqdm(total=100, desc="Loading Weights") as pbar:
progress = 0
while progress < 100:
progress = int(self.worker_init_status.get("weight_loadding", 0) * 100)
if self.worker_init_status.get("layer_loadding", 0) > 0 or self._worker_processes_ready():
progress = 100
pbar.update(progress - pbar.n)
pbar.refresh()
time.sleep(0.5)
if self.worker_proc.poll() is not None:
return False
# display layer loadding progress
with tqdm(total=100, desc="Loading Layers") as pbar:
progress = 0
while progress < 100:
progress = int(self.worker_init_status.get("layer_loadding", 0) * 100)
if self._worker_processes_ready():
progress = 100
pbar.update(progress - pbar.n)
pbar.refresh()
time.sleep(0.5)
if self.worker_proc.poll() is not None:
return False
self.worker_init_status["finished"] = True
try:
self.checking_worker_status_thread.join(timeout=1)
except Exception:
pass
return True

View File

@@ -386,6 +386,7 @@ class CompletionOutput:
draft_token_ids: list[int] = None
text: Optional[str] = None
reasoning_content: Optional[str] = None
reasoning_token_num: Optional[int] = 0
tool_calls: Optional[ToolCall] = None
def to_dict(self):
@@ -404,6 +405,7 @@ class CompletionOutput:
"draft_token_ids": self.draft_token_ids,
"text": self.text,
"reasoning_content": self.reasoning_content,
"reasoning_token_num": self.reasoning_token_num,
}
@classmethod
@@ -425,6 +427,7 @@ class CompletionOutput:
f"decode_type={self.decode_type}, "
f"draft_token_ids={self.draft_token_ids}, "
f"reasoning_content={self.reasoning_content!r}, "
f"reasoning_token_num={self.reasoning_token_num}, "
f"logprobs={self.logprobs}, "
f"top_logprobs={self.top_logprobs}, "
f"draft_top_logprobs={self.draft_top_logprobs}, "

View File

@@ -58,7 +58,7 @@ class BaseDataProcessor(ABC):
def set_value(req, key, value):
value = getattr(self.generation_config, key, value)
if isinstance(req, dict):
if key not in req:
if key not in req or req[key] is None:
req[key] = value
else:
if req.get(key) is None:

View File

@@ -191,7 +191,7 @@ class ZmqServerBase(ABC):
return str(e), None
def recv_result_handle(self):
while True:
while self.running:
try:
with self.response_token_lock:
client, _, request_id = self.socket.recv_multipart(flags=zmq.NOBLOCK)

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,849 @@
"""
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
import os
import time
import unittest
from unittest.mock import MagicMock, Mock, patch
import numpy as np
from fastdeploy.engine.args_utils import EngineArgs
from fastdeploy.engine.common_engine import EngineService
MODEL_NAME = os.getenv("MODEL_PATH", "/path/to/models") + "/ERNIE-4.5-0.3B-Paddle"
class TestCommonEngine(unittest.TestCase):
"""Test case for EngineService functionality (lines 1215-1664)"""
@classmethod
def setUpClass(cls):
"""Set up EngineService for testing"""
try:
# Create engine args for testing
engine_args = EngineArgs(
model=MODEL_NAME,
max_model_len=8192,
tensor_parallel_size=1,
engine_worker_queue_port=int(os.getenv("FD_ENGINE_QUEUE_PORT", "6778")) + 10,
cache_queue_port=int(os.getenv("FD_CACHE_QUEUE_PORT", "6779")) + 10,
)
# Create and start the engine service
cls.cfg = engine_args.create_engine_config()
cls.engine = EngineService(cls.cfg, start_queue=True, use_async_llm=True)
# Start the engine service
cls.engine.start()
except Exception as e:
print(f"Setting up EngineService failed: {e}")
raise
@classmethod
def tearDownClass(cls):
"""Clean up after all tests"""
if hasattr(cls, "engine") and cls.engine is not None:
try:
cls.engine._exit_sub_services()
print("Engine cleanup completed")
except Exception as e:
print(f"Error during engine cleanup: {e}")
def setUp(self):
"""Set up before each test method"""
print(f"Starting test: {self._testMethodName}")
def tearDown(self):
"""Clean up after each test method"""
print(f"Completed test: {self._testMethodName}")
def test_exit_sub_services(self):
"""Test _exit_sub_services method (lines 1215-1291)"""
# Test that _exit_sub_services can be called without error
# Note: We won't actually call it since it would shut down the engine
# Instead we'll test that the method exists and has expected attributes
self.assertTrue(hasattr(self.engine, "_exit_sub_services"))
self.assertTrue(callable(getattr(self.engine, "_exit_sub_services")))
# Test that engine has expected attributes that would be cleaned up
if hasattr(self.engine, "worker_proc"):
self.assertIsNotNone(self.engine.worker_proc)
# Verify running state
self.assertTrue(self.engine.running)
def test_worker_processes_ready(self):
"""Test _worker_processes_ready method (lines 1292-1299)"""
# Test with real engine that should have worker_ready_signal
if hasattr(self.engine, "worker_ready_signal"):
result = self.engine._worker_processes_ready()
# Result should be boolean
self.assertIsInstance(result, bool)
else:
self.skipTest("worker_ready_signal not available")
def test_init_worker_signals(self):
"""Test _init_worker_signals method (lines 1301-1361)"""
# Since engine is already started, signals should be initialized
self.assertTrue(hasattr(self.engine, "worker_ready_signal"))
self.assertTrue(hasattr(self.engine, "loaded_model_signal"))
# Test that signals have expected properties
if hasattr(self.engine, "worker_ready_signal"):
self.assertIsNotNone(self.engine.worker_ready_signal)
if hasattr(self.engine, "loaded_model_signal"):
self.assertIsNotNone(self.engine.loaded_model_signal)
def test_setting_environ_variables(self):
"""Test _setting_environ_variables method (lines 1362-1408)"""
result = self.engine._setting_environ_variables()
# Check that result is a string and contains expected variables
self.assertIsInstance(result, str)
self.assertIn("ENABLE_FASTDEPLOY_LOAD_MODEL_CONCURRENCY=0", result)
self.assertIn("PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python", result)
self.assertIn("FLAGS_use_append_attn=1", result)
self.assertIn("NCCL_ALGO=Ring", result)
def test_start_worker_service(self):
"""Test _start_worker_service method (lines 1409-1517)"""
# Since engine is already started, we can test that worker process exists
if hasattr(self.engine, "worker_proc") and self.engine.worker_proc:
# Worker process should be running
self.assertIsNotNone(self.engine.worker_proc)
# Process should be alive (poll returns None if still running)
poll_result = self.engine.worker_proc.poll()
if poll_result is not None:
self.skipTest("Worker process is not running")
else:
self.skipTest("Worker process not available")
def test_stop_profile(self):
"""Test _stop_profile method (lines 1519-1532)"""
# Test method exists and is callable
self.assertTrue(hasattr(self.engine, "_stop_profile"))
self.assertTrue(callable(getattr(self.engine, "_stop_profile")))
# We won't actually call it as it modifies engine state
# Just verify the do_profile attribute exists
self.assertTrue(hasattr(self.engine, "do_profile"))
def test_check_health(self):
"""Test check_health method (lines 1533-1544)"""
if hasattr(self.engine, "worker_healthy_live_signal"):
is_healthy, message = self.engine.check_health(time_interval_threashold=30)
# Should return tuple of (bool, str)
self.assertIsInstance(is_healthy, bool)
self.assertIsInstance(message, str)
else:
self.skipTest("worker_healthy_live_signal not available")
def test_launch_components(self):
"""Test launch_components method (lines 1545-1605)"""
# Method should exist and be callable
self.assertTrue(hasattr(self.engine, "launch_components"))
self.assertTrue(callable(getattr(self.engine, "launch_components")))
# Test that scheduler exists (should be created during start)
if hasattr(self.engine, "scheduler"):
self.assertIsNotNone(self.engine.scheduler)
def test_check_worker_initialize_status(self):
"""Test check_worker_initialize_status method (lines 1606-1663)"""
# Method should exist and be callable
self.assertTrue(hasattr(self.engine, "check_worker_initialize_status"))
self.assertTrue(callable(getattr(self.engine, "check_worker_initialize_status")))
# Test that worker_init_status exists
if hasattr(self.engine, "worker_init_status"):
self.assertIsInstance(self.engine.worker_init_status, dict)
def test_engine_started_successfully(self):
"""Test that engine started successfully and has expected state"""
# Verify engine is running
self.assertTrue(self.engine.running)
# Verify data processor was created
if hasattr(self.engine, "data_processor"):
self.assertIsNotNone(self.engine.data_processor)
# Verify IPC signal suffix is set
if hasattr(self.engine, "ipc_signal_suffix"):
self.assertIsNotNone(self.engine.ipc_signal_suffix)
if __name__ == "__main__":
unittest.main()
class TestCommonEngineAdditionalCoverage(unittest.TestCase):
"""Additional unit tests focusing on branch coverage for common_engine.py
These tests heavily mock subprocess/threading/IPC to avoid starting real workers
and to drive specific code paths that were previously uncovered.
"""
def _make_cfg(self, **kwargs):
args = EngineArgs(
model=MODEL_NAME,
max_model_len=128,
tensor_parallel_size=1,
# give unique ports to avoid collision with other tests
engine_worker_queue_port=str(int(os.getenv("FD_ENGINE_QUEUE_PORT", "6778")) + 20),
cache_queue_port=str(int(os.getenv("FD_CACHE_QUEUE_PORT", "6779")) + 20),
enable_prefix_caching=True,
**kwargs,
)
# Keep batch tokens small to satisfy FDConfig checks:
# max_num_batched_tokens <= max_model_len * max_num_seqs
if getattr(args, "max_num_batched_tokens", None) is None:
args.max_num_batched_tokens = 128
# Always enable chunked prefill in tests to avoid another strict check
args.enable_chunked_prefill = True
# If DP > 1, we must provide enough engine_worker_queue_port for each dp index
dp = kwargs.get("data_parallel_size", args.data_parallel_size)
base = int(args.engine_worker_queue_port.split(",")[0])
if dp and dp > 1:
ports = ",".join(str(base + i) for i in range(dp))
args.engine_worker_queue_port = ports
return args.create_engine_config(port_availability_check=False)
def _stub_processor(self):
class _Tok:
def __init__(self):
self.vocab = {"</think>": 42, "\n": 10, "<|IMAGE_PLACEHOLDER|>": 9}
def get_vocab(self):
return self.vocab
class _Proc:
def __init__(self):
self.tokenizer = _Tok()
self.eos_token_id_len = 1
self.pad_token_id = 0
return _Proc()
def test_start_prefill_branch_cache_manager_and_worker_dead(self):
"""Cover lines 184-185, 194-197, 221, 226-227 in start()."""
# For prefill + local scheduler the core code now requires a router.
# Also, with the newer CacheConfig semantics we must ensure that
# prefill_kvcache_block_num (num_gpu_blocks_override * kv_cache_ratio)
# is >= max_block_num_per_seq; use 3 blocks so that with the default
# kv_cache_ratio=0.75 we still satisfy the assertion.
with patch("fastdeploy.engine.args_utils.envs.ENABLE_V1_KVCACHE_SCHEDULER", 0):
cfg = self._make_cfg(
splitwise_role="prefill",
num_gpu_blocks_override=3,
router="0.0.0.0:30000",
)
# Patch EngineWorkerQueue before EngineService ctor to avoid real IPC
class DummyQ:
def __init__(self, *a, **k):
self.available_prefill_instances = type("X", (), {"put": lambda *_: None})()
def get_server_port(self):
return 0
def cleanup(self):
pass
def num_tasks(self):
return 0
def num_cache_infos(self):
return 0
def disaggregate_queue_empty(self):
return True
def get_disaggregated_tasks(self):
return []
with patch("fastdeploy.engine.common_engine.EngineWorkerQueue", DummyQ):
eng = EngineService(cfg, start_queue=False, use_async_llm=True)
# Patch heavy pieces
eng.create_data_processor = lambda: setattr(eng, "data_processor", self._stub_processor())
eng._process_splitwise_task = lambda: None
eng._schedule_request_to_worker = lambda: None
eng._schedule_request_to_worker_v1 = lambda: None
started_cache = {}
def fake_start_cache(device_ids, suffix):
started_cache["called"] = True
# return a list to mimic processes
return [object()]
eng.start_cache_service = fake_start_cache
# Signals: make loaded_model_signal ready immediately; include launched_cache_manager_signal
class Sig:
def __init__(self, v=0):
self.value = np.array([v], dtype=np.int32)
def clear(self):
pass
def fake_init_signals():
eng.worker_ready_signal = Sig(0)
eng.loaded_model_signal = Sig(1) # ready -> skip wait loop
eng.launched_cache_manager_signal = Sig(0)
eng._init_worker_signals = fake_init_signals
# Worker start stub and initialization status -> False to trigger error path
eng._start_worker_service = lambda: Mock(stdout=Mock(), poll=lambda: None)
eng.check_worker_initialize_status = lambda: False
with patch("fastdeploy.engine.common_engine.time.sleep", lambda *_: None):
# Avoid starting token processor loop
eng.token_processor.run = lambda: None
ok = eng.start(async_llm_pid=12345)
# start() returns False on failure
self.assertFalse(ok)
# cache manager started before workers (lines 184-185)
self.assertTrue(started_cache.get("called", False))
# launched_cache_manager_signal set (line 221)
self.assertEqual(int(eng.launched_cache_manager_signal.value[0]), 1)
# avoid atexit finalizer
if hasattr(eng, "_finalizer"):
try:
eng._finalizer.detach()
except Exception:
pass
def test_start_mixed_branch_cache_after_load_and_zmq(self):
"""Cover lines 215-217 and 231 in start()."""
cfg = self._make_cfg(splitwise_role="mixed", num_gpu_blocks_override=2)
class DummyQ:
def __init__(self, *a, **k):
self.available_prefill_instances = type("X", (), {"put": lambda *_: None})()
def get_server_port(self):
return 0
def cleanup(self):
pass
def num_tasks(self):
return 0
def num_cache_infos(self):
return 0
def disaggregate_queue_empty(self):
return True
def get_disaggregated_tasks(self):
return []
with patch("fastdeploy.engine.common_engine.EngineWorkerQueue", DummyQ):
eng = EngineService(cfg, start_queue=False, use_async_llm=True)
eng.create_data_processor = lambda: setattr(eng, "data_processor", self._stub_processor())
eng._process_splitwise_task = lambda: None
eng._schedule_request_to_worker = lambda: None
eng._schedule_request_to_worker_v1 = lambda: None
started_cache = {}
def fake_start_cache(device_ids, suffix):
started_cache["called"] = True
return [object()]
eng.start_cache_service = fake_start_cache
class Sig:
def __init__(self, v=0):
self.value = np.array([v], dtype=np.int32)
def clear(self):
pass
def fake_init_signals():
eng.worker_ready_signal = Sig(0)
eng.loaded_model_signal = Sig(1)
eng.launched_cache_manager_signal = Sig(0)
eng._init_worker_signals = fake_init_signals
eng._start_worker_service = lambda: Mock(stdout=Mock(), poll=lambda: None)
eng.check_worker_initialize_status = lambda: True
zmq_called = {}
eng.start_zmq_service = lambda pid: zmq_called.setdefault("pid", pid)
with patch("fastdeploy.engine.common_engine.time.sleep", lambda *_: None):
eng.token_processor.run = lambda: None
eng.start(async_llm_pid=8888)
self.assertTrue(started_cache.get("called", False)) # lines 215-217
self.assertEqual(zmq_called.get("pid"), 8888) # line 231
if hasattr(eng, "_finalizer"):
try:
eng._finalizer.detach()
except Exception:
pass
def test_insert_zmq_task_error_logging(self):
"""Cover lines 934-935 and 937 in _insert_zmq_task_to_scheduler."""
cfg = self._make_cfg(splitwise_role="mixed")
class DummyQ:
def __init__(self, *a, **k):
self.available_prefill_instances = type("X", (), {"put": lambda *_: None})()
def get_server_port(self):
return 0
def cleanup(self):
pass
with patch("fastdeploy.engine.common_engine.EngineWorkerQueue", DummyQ):
eng = EngineService(cfg, start_queue=False, use_async_llm=False)
eng.running = True
class DummyRecv:
def __init__(self, msg):
self.msg = msg
def receive_json_once(self, block):
return self.msg, None
def close(self):
pass
# Case 1: context terminated -> info branch
eng.recv_request_server = DummyRecv("Context was terminated")
with patch.object(eng, "llm_logger") as _:
eng._insert_zmq_task_to_scheduler()
# Case 2: other error -> error branch
eng.recv_request_server = DummyRecv("Other Error")
with patch.object(eng, "llm_logger") as _:
eng._insert_zmq_task_to_scheduler()
if hasattr(eng, "_finalizer"):
try:
eng._finalizer.detach()
except Exception:
pass
def test_exit_sub_services_cleanup_paths(self):
"""Cover lines 1312-1340, 1350-1354 in _exit_sub_services."""
cfg = self._make_cfg(splitwise_role="mixed")
class DummyQ:
def __init__(self, *a, **k):
self.available_prefill_instances = type("X", (), {"put": lambda *_: None})()
def get_server_port(self):
return 0
def cleanup(self):
pass
with patch("fastdeploy.engine.common_engine.EngineWorkerQueue", DummyQ):
eng = EngineService(cfg, start_queue=False, use_async_llm=True)
# attach stubs used by cleanup
class Sig:
def __init__(self):
self.value = np.array([0], dtype=np.int32)
def clear(self):
pass
eng.worker_ready_signal = Sig()
eng.loaded_model_signal = Sig()
eng.exist_task_signal = Sig()
eng.exist_swapped_task_signal = Sig()
eng.worker_healthy_live_signal = Sig()
eng.cache_ready_signal = Sig()
eng.swap_space_ready_signal = Sig()
eng.exist_prefill_task_signal = Sig()
eng.model_weights_status_signal = Sig()
eng.prefix_tree_status_signal = Sig()
eng.kv_cache_status_signal = Sig()
eng.send_response_server = Mock()
eng.recv_request_server = Mock()
eng.recv_control_cmd_server = Mock()
# ensure cache manager control flags exist before first call
eng.resource_manager.cache_manager.shm_cache_task_flag_broadcast = Mock(clear=lambda: None)
eng.resource_manager.cache_manager.cache_ready_signal = Mock(clear=lambda: None)
eng.cache_manager_processes = []
# worker_proc kill raises -> cover 1312-1313
eng.worker_proc = MagicMock(pid=1001)
with patch("fastdeploy.engine.common_engine.os.getpgid", side_effect=RuntimeError("boom")):
eng._exit_sub_services()
# Prepare cache manager processes to hit both normal and exception branch
class DummyCacheMgr:
def __init__(self, pid, raise_on_kill=False):
self.pid = pid
self.raise_on_kill = raise_on_kill
eng.cache_manager_processes = [DummyCacheMgr(2001, False), DummyCacheMgr(2002, True)]
eng.resource_manager.cache_manager.shm_cache_task_flag_broadcast = Mock(clear=lambda: None)
eng.resource_manager.cache_manager.cache_ready_signal = Mock(clear=lambda: None)
def fake_getpgid(pid):
return pid
def fake_killpg(pid, sig):
if pid == 2002:
raise RuntimeError("kill fail")
# cache_task_queue with cleanup
eng.cache_task_queue = Mock()
eng.cache_task_queue.cleanup = Mock()
eng.dp_processed = [Mock(pid=3001, join=lambda: None)]
eng.dp_engine_worker_queue_server = [Mock(cleanup=lambda: None)]
with (
patch("fastdeploy.engine.common_engine.os.getpgid", side_effect=fake_getpgid),
patch("fastdeploy.engine.common_engine.os.killpg", side_effect=fake_killpg),
):
eng._exit_sub_services()
# Now cover manager.shutdown warning path (no cleanup attribute)
class DummyMgr:
def __init__(self):
self.manager = Mock(shutdown=Mock(side_effect=RuntimeError("shutdown fail")))
eng.cache_task_queue = DummyMgr()
eng._exit_sub_services()
if hasattr(eng, "_finalizer"):
try:
eng._finalizer.detach()
except Exception:
pass
def test_setting_environ_variables_v1_prefill_mm(self):
"""Cover lines 1476-1485 in _setting_environ_variables."""
# For prefill + local scheduler the core code now requires a router
# and ENABLE_V1_KVCACHE_SCHEDULER=0 when using the default IPC protocol.
with patch("fastdeploy.engine.args_utils.envs.ENABLE_V1_KVCACHE_SCHEDULER", 0):
cfg = self._make_cfg(splitwise_role="prefill", router="0.0.0.0:30000")
cfg.model_config.enable_mm = True
class DummyQ:
def __init__(self, *a, **k):
pass
with patch("fastdeploy.engine.common_engine.EngineWorkerQueue", DummyQ):
eng = EngineService(cfg, start_queue=False, use_async_llm=True)
with patch("fastdeploy.engine.common_engine.envs.ENABLE_V1_KVCACHE_SCHEDULER", True):
prefix = eng._setting_environ_variables()
self.assertIn("FLAGS_use_pd_disaggregation_per_chunk=1", prefix)
self.assertIn("FLAGS_fmt_write_cache_completed_signal=1", prefix)
self.assertIn("FLAGS_max_partition_size=1024", prefix)
if hasattr(eng, "_finalizer"):
try:
eng._finalizer.detach()
except Exception:
pass
def test_start_worker_service_cmd_build(self):
"""Cover 1517, 1526, 1568, 1592, 1595 by building the worker command with mocks."""
with patch("fastdeploy.config.get_host_ip", return_value="127.0.0.1"):
cfg = self._make_cfg(splitwise_role="mixed", num_gpu_blocks_override=4, ips=["127.0.0.1", "127.0.0.2"])
# Make model multi-modal so env var branch already covered above; here not required
cfg.structured_outputs_config.logits_processors = ["A", "B"]
class DummyQ:
def __init__(self, *a, **k):
pass
with patch("fastdeploy.engine.common_engine.EngineWorkerQueue", DummyQ):
eng = EngineService(cfg, start_queue=False, use_async_llm=True)
eng.data_processor = self._stub_processor()
captured = {"cmd": None}
class DummyProc:
def __init__(self):
self.stdout = None
def poll(self):
return None
def fake_popen(cmd, stdout, shell, preexec_fn):
captured["cmd"] = cmd
return DummyProc()
with patch("fastdeploy.engine.common_engine.subprocess.Popen", side_effect=fake_popen):
with patch("fastdeploy.engine.common_engine.llm_logger"):
p = eng._start_worker_service()
self.assertIsNotNone(p)
self.assertIsInstance(captured["cmd"], str)
# logits processors added (1568)
self.assertIn("--logits-processors A B", captured["cmd"]) # type: ignore
# num_gpu_blocks_override added (1592)
self.assertIn("--num_gpu_blocks_override 4", captured["cmd"]) # type: ignore
# ips/nnodes added when nnode > 1 (1595)
self.assertIn("--nnodes 2", captured["cmd"]) # type: ignore
if hasattr(eng, "_finalizer"):
try:
eng._finalizer.detach()
except Exception:
pass
def test_check_health_unhealthy(self):
"""Cover line 1628: unhealthy worker."""
cfg = self._make_cfg(splitwise_role="mixed")
class DummyQ:
def __init__(self, *a, **k):
pass
with patch("fastdeploy.engine.common_engine.EngineWorkerQueue", DummyQ):
eng = EngineService(cfg, start_queue=False, use_async_llm=True)
class Sig:
def __init__(self, v):
self.value = np.array([v], dtype=np.int32)
# set worker live time far past threshold
eng.worker_healthy_live_signal = Sig(int(time.time()) - 1000)
ok, msg = eng.check_health(time_interval_threashold=1)
self.assertFalse(ok)
self.assertIn("Not Healthy".lower(), msg.lower())
if hasattr(eng, "_finalizer"):
try:
eng._finalizer.detach()
except Exception:
pass
def test_launch_components_expert_parallel(self):
"""Cover 1635-1638, 1660-1676, 1684-1703 in launch_components()."""
# For prefill + local scheduler the core code now requires a router
# and ENABLE_V1_KVCACHE_SCHEDULER=0 when using the default IPC protocol.
with patch("fastdeploy.engine.args_utils.envs.ENABLE_V1_KVCACHE_SCHEDULER", 0):
cfg = self._make_cfg(
splitwise_role="prefill",
# enable expert parallel and dp > 1 to go into the branch
data_parallel_size=2,
enable_expert_parallel=True,
router="0.0.0.0:30000",
)
# Provide EngineWorkerQueue stub for ctor
class DummyQ:
def __init__(self, *a, **k):
self.available_prefill_instances = type("X", (), {"put": lambda *_: None})()
def get_server_port(self):
return 0
def cleanup(self):
pass
with patch("fastdeploy.engine.common_engine.EngineWorkerQueue", DummyQ):
eng = EngineService(cfg, start_queue=True, use_async_llm=True)
# Init signals to create launched_expert_service_signal
with patch("fastdeploy.engine.common_engine.envs.FD_ENABLE_MULTI_API_SERVER", False):
eng.ipc_signal_suffix = cfg.parallel_config.engine_worker_queue_port[0]
eng._init_worker_signals()
# Don't create real queues/processes
with (
patch("fastdeploy.engine.common_engine.EngineWorkerQueue") as FakeQ,
patch("fastdeploy.engine.common_engine.multiprocessing.Process") as FakeP,
):
# Fake queue instances with cleanup
FakeQ.return_value = Mock(cleanup=lambda: None)
# When starting process, immediately mark the signal as 1 to break waiting loop
def start_side_effect(*args, **kwargs):
# set value for dp id 1
eng.launched_expert_service_signal.value[1] = 1
proc_instance = Mock(start=start_side_effect)
FakeP.return_value = proc_instance
# Avoid scheduler doing real work
eng.scheduler.start = lambda *a, **k: None
with patch("fastdeploy.engine.common_engine.time.sleep", lambda *_: None):
eng.launch_components()
# Verify expert service branch executed
self.assertTrue(hasattr(eng, "dp_processed"))
self.assertGreaterEqual(len(eng.dp_processed), 1)
if hasattr(eng, "_finalizer"):
try:
eng._finalizer.detach()
except Exception:
pass
def test_check_worker_initialize_status_progress(self):
"""Cover 1710-1762 by simulating stdout and ready signals."""
cfg = self._make_cfg(splitwise_role="mixed")
class DummyQ:
def __init__(self, *a, **k):
pass
with patch("fastdeploy.engine.common_engine.EngineWorkerQueue", DummyQ):
eng = EngineService(cfg, start_queue=False, use_async_llm=True)
# Fake worker process stdout content that matches regexes
lines = [
b"Loading checkpoint shards: 1\n",
b"Start load layer 5\n",
]
class DummyProc:
def __init__(self, it):
self._it = iter(it)
@property
def stdout(self):
return self._it
def poll(self):
return None
eng.worker_proc = DummyProc(lines)
eng.worker_init_status = {}
eng.cfg.model_config.num_hidden_layers = 8
# worker_ready_signal makes _worker_processes_ready() return True
class Sig:
def __init__(self):
self.value = np.array([1], dtype=np.int32)
eng.worker_ready_signal = Sig()
# Replace tqdm and sleep for fast execution
class DummyPbar:
def __init__(self):
self.n = 0
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def update(self, delta=0, *args, **kwargs):
try:
self.n += int(delta)
except Exception:
self.n = 0
def refresh(self):
pass
with patch("fastdeploy.engine.common_engine.tqdm", lambda *a, **k: DummyPbar()):
with patch("fastdeploy.engine.common_engine.time.sleep", lambda *_: None):
ok = eng.check_worker_initialize_status()
self.assertTrue(ok)
if hasattr(eng, "_finalizer"):
try:
eng._finalizer.detach()
except Exception:
pass
def test_worker_processes_ready_false(self):
"""Cover line 1382 returning False."""
cfg = self._make_cfg()
class DummyQ:
def __init__(self, *a, **k):
pass
with patch("fastdeploy.engine.common_engine.EngineWorkerQueue", DummyQ):
eng = EngineService(cfg, start_queue=False, use_async_llm=True)
class Sig:
def __init__(self):
# less than worker_num_per_node
self.value = np.array([0], dtype=np.int32)
eng.worker_ready_signal = Sig()
self.assertFalse(eng._worker_processes_ready())
if hasattr(eng, "_finalizer"):
try:
eng._finalizer.detach()
except Exception:
pass
def test_init_worker_signals_profile_iluvatar(self):
"""Cover line 1434 by forcing iluvatar custom device and do_profile=True."""
# do_profile=True when num_gpu_blocks_override is None
cfg = self._make_cfg(num_gpu_blocks_override=None)
class DummyQ:
def __init__(self, *a, **k):
pass
with patch("fastdeploy.engine.common_engine.EngineWorkerQueue", DummyQ):
eng = EngineService(cfg, start_queue=False, use_async_llm=True)
eng.ipc_signal_suffix = cfg.parallel_config.engine_worker_queue_port[0]
with patch("fastdeploy.engine.common_engine.paddle.is_compiled_with_custom_device", return_value=True):
eng._init_worker_signals()
# signal should exist
self.assertTrue(hasattr(eng, "get_profile_block_num_signal"))
if hasattr(eng, "_finalizer"):
try:
eng._finalizer.detach()
except Exception:
pass
def test_launch_components_dp_mode(self):
"""Cover 1648-1652 branch for DP scheduler mode."""
# When ENABLE_V1_KVCACHE_SCHEDULER=1 the IPC cache-transfer protocol
# is no longer supported; force it to 0 here to avoid the
# NotImplementedError raised in EngineArgs.__post_init__ so we can
# still exercise the DP branch of launch_components.
with patch("fastdeploy.engine.args_utils.envs.ENABLE_V1_KVCACHE_SCHEDULER", 0):
cfg = self._make_cfg(
splitwise_role="prefill",
data_parallel_size=2,
scheduler_name="dp",
)
class DummyQ:
def __init__(self, *a, **k):
self.available_prefill_instances = type("X", (), {"put": lambda *_: None})()
with patch("fastdeploy.engine.common_engine.EngineWorkerQueue", DummyQ):
eng = EngineService(cfg, start_queue=False, use_async_llm=True)
# Patch scheduler.start so it doesn't do heavy work
eng.scheduler.start = Mock()
eng.launch_components()
eng.scheduler.start.assert_called()
if hasattr(eng, "_finalizer"):
try:
eng._finalizer.detach()
except Exception:
pass

View File

@@ -0,0 +1,38 @@
"""
Simple tests for ZmqServerBase.recv_result_handle to cover the startup log line.
"""
import unittest
from fastdeploy.inter_communicator.zmq_server import ZmqServerBase
class _DummyServer(ZmqServerBase):
"""Minimal concrete subclass to satisfy abstract methods.
We do not create any real ZMQ sockets; we only need to call
recv_result_handle with running=False so the loop is skipped.
"""
def __init__(self):
super().__init__()
self.socket = None
self.running = False # skip loop to just hit the startup log
def _create_socket(self): # pragma: no cover - not needed in this test
return None
def close(self): # pragma: no cover - not needed in this test
pass
class TestZmqServerRecvResultHandle(unittest.TestCase):
def test_recv_result_handle_startup_log(self):
"""Just invoke recv_result_handle to execute the first log line (L123)."""
srv = _DummyServer()
# Should not raise; returns None after logging start/finish and skipping loop
self.assertIsNone(srv.recv_result_handle())
if __name__ == "__main__":
unittest.main()