[FDConfig] remove engine client args, use fd_config instead (#5217)

* [refactor] remove engine client args, use fd_config instead

* [chore] update

* [fix] fix

* [fix] fix

* [chore] rename config to fd_config

* [fix] fix run_batch

* [ci] add ci case for engine client

---------

Co-authored-by: Jiaxin Sui <95567040+plusNew001@users.noreply.github.com>
This commit is contained in:
Yonghua Li
2025-11-28 17:20:54 +08:00
committed by GitHub
parent 73886204d4
commit a535050b11
4 changed files with 70 additions and 1561 deletions

View File

@@ -26,6 +26,7 @@ import numpy as np
from filelock import FileLock
from fastdeploy import envs
from fastdeploy.config import FDConfig
from fastdeploy.entrypoints.openai.utils import DealerConnectionManager
from fastdeploy.envs import FD_SUPPORT_MAX_CONNECTIONS
from fastdeploy.eplb.utils import RedundantExpertWorkload
@@ -55,62 +56,41 @@ class EngineClient:
EngineClient is a class that handles the communication between the client and the server.
"""
def __init__(
self,
model_name_or_path,
tokenizer,
max_model_len,
tensor_parallel_size,
pid,
port,
limit_mm_per_prompt,
mm_processor_kwargs,
config,
reasoning_parser=None,
data_parallel_size=1,
enable_logprob=False,
workers=1,
tool_parser=None,
enable_prefix_caching=None,
splitwise_role=None,
max_processor_cache=0,
):
self.config = config
self.model_config = config.model_config
self.enable_mm = self.model_config.enable_mm
enable_processor_cache = self.enable_mm and max_processor_cache > 0
def __init__(self, pid: int | str, port: int | str, fd_config: FDConfig, workers: int = 1):
self.fd_config = fd_config
self.tensor_parallel_size = self.fd_config.parallel_config.tensor_parallel_size
self.enable_mm = self.fd_config.model_config.enable_mm
input_processor = InputPreprocessor(
self.model_config,
reasoning_parser,
limit_mm_per_prompt,
mm_processor_kwargs,
tool_parser,
enable_processor_cache,
self.fd_config.model_config,
self.fd_config.structured_outputs_config.reasoning_parser,
self.fd_config.limit_mm_per_prompt,
self.fd_config.mm_processor_kwargs,
self.fd_config.tool_parser,
self.enable_mm and self.fd_config.cache_config.max_processor_cache > 0,
)
self.enable_logprob = enable_logprob
self.reasoning_parser = reasoning_parser
self.enable_logprob = self.fd_config.model_config.enable_logprob
self.data_processor = input_processor.create_processor()
self.max_model_len = max_model_len
self.enable_prefix_caching = enable_prefix_caching
self.enable_splitwise = splitwise_role != "mixed"
max_chips_per_node = 16 if current_platform.is_iluvatar() else 8
self.max_model_len = self.fd_config.model_config.max_model_len
self.enable_prefix_caching = self.fd_config.cache_config.enable_prefix_caching
self.enable_splitwise = self.fd_config.scheduler_config.splitwise_role != "mixed"
self.max_chips_per_node = 16 if current_platform.is_iluvatar() else 8
if self.enable_mm and self.enable_prefix_caching:
from fastdeploy.cache_manager.cache_data import (
is_mm_model_disable_prefix_cache,
)
self.disable_prefix_mm = is_mm_model_disable_prefix_cache(self.model_config)
self.disable_prefix_mm = is_mm_model_disable_prefix_cache(self.fd_config.model_config)
if tensor_parallel_size <= max_chips_per_node:
if self.tensor_parallel_size <= self.max_chips_per_node:
self.is_master = True
else:
self.is_master = False
if self.config.eplb_config.enable_eplb:
if self.fd_config.eplb_config.enable_eplb:
self.init_eplb_signals(ipc_signal_suffix=port)
array_size = min(max_chips_per_node, tensor_parallel_size)
array_size = min(self.max_chips_per_node, self.tensor_parallel_size)
self.worker_healthy_live_recorded_time_array = np.zeros(shape=[array_size], dtype=np.int32)
self.worker_healthy_live_signal = IPCSignal(
name="worker_healthy_live_signal",
@@ -154,7 +134,7 @@ class EngineClient:
"""
Initialize eplb signals.
"""
if self.config.parallel_config.tensor_parallel_rank != 0:
if self.fd_config.parallel_config.tensor_parallel_rank != 0:
# only TP rank 0 need to init eplb signals, rank 0 manage all EPLB signals for all TP ranks
return
@@ -164,7 +144,7 @@ class EngineClient:
self.signal_update_weight_from_disk_array_list = []
self.update_weight_from_disk_result_list = []
dp_ipc_signal_suffix = f"{ipc_signal_suffix}_dp{self.config.parallel_config.local_data_parallel_id}"
dp_ipc_signal_suffix = f"{ipc_signal_suffix}_dp{self.fd_config.parallel_config.local_data_parallel_id}"
rearrange_experts_status = np.zeros([1], dtype=np.int32)
self.rearrange_experts_signal = IPCSignal(
name="rearrange_experts_status",
@@ -185,7 +165,7 @@ class EngineClient:
self.shm_rearrange_experts_ips_list = IPCSignal(
name="rearrange_experts_ips_list",
shm_size=self.config.eplb_config.redundant_expert_ip_shm_size,
shm_size=self.fd_config.eplb_config.redundant_expert_ip_shm_size,
suffix=dp_ipc_signal_suffix,
create=False,
)
@@ -199,7 +179,7 @@ class EngineClient:
create=False,
)
for tp_rank_id in range(self.config.parallel_config.tensor_parallel_size):
for tp_rank_id in range(self.tensor_parallel_size):
tp_ipc_signal_suffix = f"{dp_ipc_signal_suffix}_tp{tp_rank_id}"
signal_clear_experts_token_stats = np.zeros([1], dtype=np.int32)
self.signal_clear_experts_token_stats_list.append(
@@ -235,7 +215,7 @@ class EngineClient:
)
experts_token_stats = np.zeros(
(self.config.model_config.num_hidden_layers, self.config.model_config.moe_num_experts),
(self.fd_config.model_config.num_hidden_layers, self.fd_config.model_config.moe_num_experts),
dtype=np.int32,
)
self.expert_tokens_stats_array_list.append(
@@ -593,7 +573,7 @@ class EngineClient:
Returns:
tuple: response body, status code
"""
eplb_config = self.config.eplb_config
eplb_config = self.fd_config.eplb_config
if not eplb_config.enable_eplb:
content = {"code": 1, "msg": "redundant expert is disabled"}
status_code = HTTPStatus.BAD_REQUEST
@@ -607,10 +587,10 @@ class EngineClient:
status_code = HTTPStatus.UNAUTHORIZED
return content, status_code
if self.config.parallel_config.tensor_parallel_rank != 0:
if self.fd_config.parallel_config.tensor_parallel_rank != 0:
content = {
"code": 1,
"msg": f"actual rank {self.config.parallel_config.tensor_parallel_rank}, expect rank 0",
"msg": f"actual rank {self.fd_config.parallel_config.tensor_parallel_rank}, expect rank 0",
}
status_code = HTTPStatus.BAD_REQUEST
return content, status_code
@@ -663,10 +643,10 @@ class EngineClient:
status_code = HTTPStatus.OK
return content, status_code
elif action == "update_weight_from_tensor":
if self.config.scheduler_config.splitwise_role != "prefill" and content is None:
if self.fd_config.scheduler_config.splitwise_role != "prefill" and content is None:
content = {
"code": 1,
"msg": f"actual role {self.config.scheduler_config.splitwise_role}, expect role prefill",
"msg": f"actual role {self.fd_config.scheduler_config.splitwise_role}, expect role prefill",
}
status_code = HTTPStatus.BAD_REQUEST
if self.rearrange_experts_signal.value[0] != RearrangeExpertStatus.LOAD_SUCC.value and content is None:
@@ -695,7 +675,7 @@ class EngineClient:
Returns:
tuple: response body, status code
"""
eplb_config = self.config.eplb_config
eplb_config = self.fd_config.eplb_config
if not eplb_config.enable_eplb:
content = {"code": 1, "msg": "redundant expert is disabled"}
status_code = HTTPStatus.BAD_REQUEST
@@ -709,10 +689,10 @@ class EngineClient:
status_code = HTTPStatus.UNAUTHORIZED
return content, status_code
if self.config.parallel_config.tensor_parallel_rank != 0:
if self.fd_config.parallel_config.tensor_parallel_rank != 0:
content = {
"code": 1,
"msg": f"actual rank {self.config.parallel_config.tensor_parallel_rank}, expect rank 0",
"msg": f"actual rank {self.fd_config.parallel_config.tensor_parallel_rank}, expect rank 0",
}
status_code = HTTPStatus.BAD_REQUEST
return content, status_code
@@ -737,7 +717,7 @@ class EngineClient:
tuple: response body, status code
"""
content, status_code = None, HTTPStatus.OK
eplb_config = self.config.eplb_config
eplb_config = self.fd_config.eplb_config
if not eplb_config.enable_eplb:
content = {"code": 1, "msg": "redundant expert is disabled"}
@@ -752,10 +732,10 @@ class EngineClient:
status_code = HTTPStatus.UNAUTHORIZED
return content, status_code
if self.config.parallel_config.tensor_parallel_rank != 0:
if self.fd_config.parallel_config.tensor_parallel_rank != 0:
content = {
"code": 1,
"msg": f"actual rank {self.config.parallel_config.tensor_parallel_rank}, expect rank 0",
"msg": f"actual rank {self.fd_config.parallel_config.tensor_parallel_rank}, expect rank 0",
}
status_code = HTTPStatus.BAD_REQUEST
return content, status_code

View File

@@ -175,25 +175,12 @@ async def lifespan(app: FastAPI):
model_paths = [ModelPath(name=served_model_names, model_path=args.model, verification=verification)]
engine_args = EngineArgs.from_cli_args(args)
config = engine_args.create_engine_config(port_availability_check=False)
fd_config = engine_args.create_engine_config(port_availability_check=False)
engine_client = EngineClient(
model_name_or_path=args.model,
tokenizer=args.tokenizer,
max_model_len=args.max_model_len,
tensor_parallel_size=args.tensor_parallel_size,
pid=pid,
port=int(os.environ.get("INFERENCE_MSG_QUEUE_ID", "0")),
limit_mm_per_prompt=args.limit_mm_per_prompt,
mm_processor_kwargs=args.mm_processor_kwargs,
reasoning_parser=args.reasoning_parser,
data_parallel_size=args.data_parallel_size,
enable_logprob=args.enable_logprob,
fd_config=fd_config,
workers=args.workers,
tool_parser=args.tool_call_parser,
enable_prefix_caching=args.enable_prefix_caching,
splitwise_role=args.splitwise_role,
max_processor_cache=args.max_processor_cache,
config=config,
)
await engine_client.connection_manager.initialize()
app.state.dynamic_load_weight = args.dynamic_load_weight
@@ -224,14 +211,14 @@ async def lifespan(app: FastAPI):
embedding_handler = OpenAIServingEmbedding(
engine_client,
app.state.model_handler,
config,
fd_config,
pid,
args.ips,
args.max_waiting_time,
chat_template,
)
reward_handler = OpenAIServingReward(
engine_client, app.state.model_handler, config, pid, args.ips, args.max_waiting_time, chat_template
engine_client, app.state.model_handler, fd_config, pid, args.ips, args.max_waiting_time, chat_template
)
engine_client.create_zmq_client(model=pid, mode=zmq.PUSH)
engine_client.pid = pid

View File

@@ -352,22 +352,12 @@ def create_model_paths(args: Namespace) -> List[ModelPath]:
async def initialize_engine_client(args: Namespace, pid: int) -> EngineClient:
"""Initialize and configure the engine client."""
engine_args = EngineArgs.from_cli_args(args)
config = engine_args.create_engine_config(port_availability_check=False)
fd_config = engine_args.create_engine_config(port_availability_check=False)
engine_client = EngineClient(
model_name_or_path=args.model,
tokenizer=args.tokenizer,
max_model_len=args.max_model_len,
tensor_parallel_size=args.tensor_parallel_size,
pid=pid,
port=int(args.engine_worker_queue_port[args.local_data_parallel_id]),
limit_mm_per_prompt=args.limit_mm_per_prompt,
mm_processor_kwargs=args.mm_processor_kwargs,
reasoning_parser=args.reasoning_parser,
data_parallel_size=args.data_parallel_size,
enable_logprob=args.enable_logprob,
fd_config=fd_config,
workers=args.workers,
tool_parser=args.tool_call_parser,
config=config,
)
await engine_client.connection_manager.initialize()

File diff suppressed because it is too large Load Diff