[Features] support hugging face qwen3 moe (#3649)
Some checks failed
CE Compile Job / ce_job_pre_check (push) Has been cancelled
CE Compile Job / print_ce_job_pre_check_outputs (push) Has been cancelled
CE Compile Job / FD-Clone-Linux (push) Has been cancelled
CE Compile Job / Show Code Archive Output (push) Has been cancelled
CE Compile Job / BUILD_SM8090 (push) Has been cancelled
CE Compile Job / BUILD_SM8689 (push) Has been cancelled
CE Compile Job / CE_UPLOAD (push) Has been cancelled
Deploy GitHub Pages / deploy (push) Has been cancelled

* split ut

* qwen3-30B-A3B

* fix

* add test

* add test_torch_model.py

* fix test_torch_model.py

* delete print

* fix moe

* delete init.py

* fix

* fix

---------

Co-authored-by: bukejiyu <395822456@qq.com>
Co-authored-by: bukejiyu <52310069+bukejiyu@users.noreply.github.com>
This commit is contained in:
lizexu123
2025-08-30 15:26:05 +08:00
committed by GitHub
parent f206474cc7
commit 455205f991
9 changed files with 437 additions and 258 deletions

View File

@@ -294,6 +294,7 @@ class ReplicatedLinear(LinearBase):
weight_loader=( weight_loader=(
self.weight_loader if hasattr(self, "weight_loader") else default_weight_loader(self.fd_config) self.weight_loader if hasattr(self, "weight_loader") else default_weight_loader(self.fd_config)
), ),
model_format=fd_config.model_config.model_format,
) )
@@ -446,7 +447,6 @@ class MergedColumnParallelLinear(ColumnParallelLinear):
shard_size = (self.local_rank + 1) * block_size shard_size = (self.local_rank + 1) * block_size
loaded_weight = slice_fn(loaded_weight, output_dim, start=shard_offset, end=shard_size) loaded_weight = slice_fn(loaded_weight, output_dim, start=shard_offset, end=shard_size)
loaded_weight = get_tensor(loaded_weight)
if not param._is_initialized(): if not param._is_initialized():
param.initialize() param.initialize()
param_shard_size = output_size // 2 param_shard_size = output_size // 2
@@ -574,7 +574,6 @@ class QKVParallelLinear(ColumnParallelLinear):
shard_size = (shard_id + 1) * block_size shard_size = (shard_id + 1) * block_size
loaded_weight = slice_fn(loaded_weight, output_dim, start=shard_offset, end=shard_size) loaded_weight = slice_fn(loaded_weight, output_dim, start=shard_offset, end=shard_size)
loaded_weight = get_tensor(loaded_weight)
if not param._is_initialized(): if not param._is_initialized():
param.initialize() param.initialize()

View File

@@ -19,7 +19,7 @@ from abc import abstractmethod
import paddle import paddle
from paddle import nn from paddle import nn
from fastdeploy.model_executor.utils import set_weight_attrs from fastdeploy.model_executor.utils import default_weight_loader, set_weight_attrs
from fastdeploy.platforms import current_platform from fastdeploy.platforms import current_platform
from ..quantization.quant_base import QuantMethodBase from ..quantization.quant_base import QuantMethodBase
@@ -205,5 +205,17 @@ class UnquantizedFusedMoEMethod(MoEMethodBase):
default_initializer=paddle.nn.initializer.Constant(0), default_initializer=paddle.nn.initializer.Constant(0),
) )
set_weight_attrs(layer.up_gate_proj_weight, extra_weight_attrs) set_weight_attrs(
set_weight_attrs(layer.down_proj_weight, extra_weight_attrs) layer.up_gate_proj_weight,
{
"weight_loader": extra_weight_attrs.get("weight_loader", default_weight_loader(layer.fd_config)),
"model_format": extra_weight_attrs.get("model_format", ""),
},
)
set_weight_attrs(
layer.down_proj_weight,
{
"weight_loader": extra_weight_attrs.get("weight_loader", default_weight_loader(layer.fd_config)),
"model_format": extra_weight_attrs.get("model_format", ""),
},
)

View File

@@ -151,7 +151,9 @@ class FusedMoE(nn.Layer):
self.gate_correction_bias = gate_correction_bias self.gate_correction_bias = gate_correction_bias
else: else:
self.gate_correction_bias = None self.gate_correction_bias = None
self.quant_method.create_weights(self, weight_loader=self.weight_loader) self.quant_method.create_weights(
self, weight_loader=self.weight_loader, model_format=fd_config.model_config.model_format
)
logger.info( logger.info(
f"{moe_tag}MoE config is {num_experts=}[{expert_id_offset}, {expert_id_offset + self.num_local_experts}), \ f"{moe_tag}MoE config is {num_experts=}[{expert_id_offset}, {expert_id_offset + self.num_local_experts}), \
@@ -197,6 +199,9 @@ class FusedMoE(nn.Layer):
) )
def _load_gate_up_weight(self, param, expert_id, loaded_weight, shard_id, shard_dim=None): def _load_gate_up_weight(self, param, expert_id, loaded_weight, shard_id, shard_dim=None):
model_format = getattr(param, "model_format", "")
if model_format == "torch":
loaded_weight = loaded_weight.transpose([1, 0])
dim = -1 if shard_dim else 0 dim = -1 if shard_dim else 0
if self.tp_size > 1: if self.tp_size > 1:
if isinstance(loaded_weight, (np.ndarray, paddle.Tensor)): if isinstance(loaded_weight, (np.ndarray, paddle.Tensor)):
@@ -208,8 +213,6 @@ class FusedMoE(nn.Layer):
shard_size = (self.tp_rank + 1) * block_size shard_size = (self.tp_rank + 1) * block_size
loaded_weight = slice_fn(loaded_weight, shard_dim, shard_offset, shard_size) loaded_weight = slice_fn(loaded_weight, shard_dim, shard_offset, shard_size)
loaded_weight = get_tensor(loaded_weight)
expert_param = param[expert_id - self.expert_id_offset] expert_param = param[expert_id - self.expert_id_offset]
param_shard_size = expert_param.shape[dim] // 2 param_shard_size = expert_param.shape[dim] // 2
if shard_id == "gate": if shard_id == "gate":
@@ -229,6 +232,7 @@ class FusedMoE(nn.Layer):
) )
# To ensure compatibility across backends, apply an extra transpose for GCU and XPU # To ensure compatibility across backends, apply an extra transpose for GCU and XPU
if current_platform.is_xpu() or current_platform.is_gcu():
if expert_param.shape != loaded_weight.shape: if expert_param.shape != loaded_weight.shape:
loaded_weight = loaded_weight.transpose([1, 0]) loaded_weight = loaded_weight.transpose([1, 0])
assert expert_param.shape == loaded_weight.shape, ( assert expert_param.shape == loaded_weight.shape, (
@@ -237,6 +241,9 @@ class FusedMoE(nn.Layer):
expert_param.copy_(loaded_weight, False) expert_param.copy_(loaded_weight, False)
def _load_down_weight(self, param, expert_id, loaded_weight, shard_id, shard_dim=None): def _load_down_weight(self, param, expert_id, loaded_weight, shard_id, shard_dim=None):
model_format = getattr(param, "model_format", "")
if model_format == "torch":
loaded_weight = loaded_weight.transpose([1, 0])
if self.tp_size > 1 and shard_dim is not None: if self.tp_size > 1 and shard_dim is not None:
dim = -1 if shard_dim else 0 dim = -1 if shard_dim else 0
if isinstance(loaded_weight, (np.ndarray, paddle.Tensor)): if isinstance(loaded_weight, (np.ndarray, paddle.Tensor)):
@@ -247,12 +254,12 @@ class FusedMoE(nn.Layer):
shard_offset = self.tp_rank * block_size shard_offset = self.tp_rank * block_size
shard_size = (self.tp_rank + 1) * block_size shard_size = (self.tp_rank + 1) * block_size
loaded_weight = slice_fn(loaded_weight, shard_dim, shard_offset, shard_size) loaded_weight = slice_fn(loaded_weight, shard_dim, shard_offset, shard_size)
loaded_weight = get_tensor(loaded_weight)
expert_param = param[expert_id - self.expert_id_offset] expert_param = param[expert_id - self.expert_id_offset]
if hasattr(param, "tensor_track"): if hasattr(param, "tensor_track"):
# for dyn quant # for dyn quant
param.tensor_track.mark(start=0, batch_id=expert_id - self.expert_id_offset) param.tensor_track.mark(start=0, batch_id=expert_id - self.expert_id_offset)
# To ensure compatibility across backends, apply an extra transpose for GCU and XPU # To ensure compatibility across backends, apply an extra transpose for GCU and XPU
if current_platform.is_xpu or current_platform.is_gcu():
if expert_param.shape != loaded_weight.shape: if expert_param.shape != loaded_weight.shape:
loaded_weight = loaded_weight.transpose([1, 0]) loaded_weight = loaded_weight.transpose([1, 0])
assert expert_param.shape == loaded_weight.shape, ( assert expert_param.shape == loaded_weight.shape, (

View File

@@ -11,48 +11,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import os
import signal
import socket
import subprocess
import time import time
from typing import Any, Union from typing import Any, Union
import pytest import pytest
from model_loader.utils import clean_ports
def kill_process_on_port(port: int):
"""
Kill processes that are listening on the given port.
Uses `lsof` to find process ids and sends SIGKILL.
"""
try:
output = subprocess.check_output(f"lsof -i:{port} -t", shell=True).decode().strip()
for pid in output.splitlines():
os.kill(int(pid), signal.SIGKILL)
print(f"Killed process on port {port}, pid={pid}")
except subprocess.CalledProcessError:
pass
def clean_ports(ports_to_clean: list[int]):
"""
Kill all processes occupying the ports listed in PORTS_TO_CLEAN.
"""
for port in ports_to_clean:
kill_process_on_port(port)
def is_port_open(host: str, port: int, timeout=1.0):
"""
Check if a TCP port is open on the given host.
Returns True if connection succeeds, False otherwise.
"""
try:
with socket.create_connection((host, port), timeout):
return True
except Exception:
return False
class FDRunner: class FDRunner:
@@ -93,6 +56,7 @@ class FDRunner:
sample_output_ids: list[list[int]] = [] sample_output_ids: list[list[int]] = []
sample_output_strs: list[str] = [] sample_output_strs: list[str] = []
for output in req_outputs: for output in req_outputs:
print("output", output)
sample_output_ids.append(output.outputs.token_ids) sample_output_ids.append(output.outputs.token_ids)
sample_output_strs.append(output.outputs.text) sample_output_strs.append(output.outputs.text)
outputs.append((sample_output_ids, sample_output_strs)) outputs.append((sample_output_ids, sample_output_strs))

View File

@@ -13,151 +13,30 @@
# limitations under the License. # limitations under the License.
import os import os
import shutil import sys
import traceback
import warnings
from multiprocessing import Process, Queue
import pytest import pytest
os.environ["LOAD_STATE_DICT_THREAD_NUM"] = "1" current_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.abspath(os.path.join(current_dir, ".."))
if project_root not in sys.path:
sys.path.insert(0, project_root)
from tests.model_loader.utils import (
check_tokens_id_and_text_close,
form_model_get_output_topp0,
get_paddle_model_path,
run_with_timeout,
)
FD_ENGINE_QUEUE_PORT = int(os.getenv("FD_ENGINE_QUEUE_PORT", 8313)) FD_ENGINE_QUEUE_PORT = int(os.getenv("FD_ENGINE_QUEUE_PORT", 8313))
MAX_WAIT_SECONDS = 60 * 5
prompts = ["解释下“温故而知新", "Hello, how are you?"] prompts = ["解释下“温故而知新", "Hello, how are you?"]
TokensIdText = list[tuple[list[int], str]]
# (token_ids, text)
def get_model_paths(base_model_name: str) -> tuple[str, str]:
"""return (fastdeploy_path, huggingface_path)"""
# FastDeploy model path
fd_base_path = os.getenv("MODEL_PATH")
if fd_base_path:
fd_model_path = os.path.join(fd_base_path, base_model_name)
else:
fd_model_path = base_model_name
# HuggingFace model path
torch_model_path = os.path.join(
fd_base_path,
"torch",
base_model_name,
)
return fd_model_path, torch_model_path
def clear_logs():
log_path = os.path.join(os.getcwd(), "log")
if os.path.exists(log_path):
try:
shutil.rmtree(log_path)
print(f"Deleted log directory: {log_path}")
except Exception as e:
print(f"Failed to delete log directory {log_path}: {e}")
else:
print(f"No log directory found at {log_path}")
def print_logs():
log_dir = os.path.join(os.getcwd(), "log")
log_file = os.path.join(log_dir, "workerlog.0")
if not os.path.exists(log_file):
print(f"Log file {log_file} does not exist.")
return
print(f"\n===== {log_file} start =====")
with open(log_file, "r") as f:
for line in f:
print(line, end="")
print(f"\n===== {log_file} end =====\n")
def check_tokens_id_and_text_close(
*,
outputs_0_lst: TokensIdText,
outputs_1_lst: TokensIdText,
name_0: str,
name_1: str,
warn_on_mismatch: bool = True,
) -> None:
assert len(outputs_0_lst) == len(outputs_1_lst)
for prompt_idx, (outputs_0, outputs_1) in enumerate(zip(outputs_0_lst, outputs_1_lst)):
assert len(outputs_0) == len(outputs_1)
output_ids_0, output_str_0 = outputs_0
output_ids_1, output_str_1 = outputs_1
# Loop through generated tokens.
for idx, (output_id_0, output_id_1) in enumerate(zip(output_ids_0, output_ids_1)):
is_tok_mismatch = output_id_0 != output_id_1
if is_tok_mismatch and warn_on_mismatch:
fail_msg = (
f"Test{prompt_idx}:"
f"\nMatched tokens:\t{output_ids_0[:idx]}"
f"\n{name_0}:\t{output_str_0!r}"
f"\n{name_1}:\t{output_str_1!r}"
)
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(fail_msg, stacklevel=2)
break
else:
if output_str_0 != output_str_1 and warn_on_mismatch:
fail_msg = f"Test{prompt_idx}:" f"\n{name_0}:\t{output_str_0!r}" f"\n{name_1}:\t{output_str_1!r}"
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(fail_msg, stacklevel=2)
def form_model_get_output(
fd_runner,
model_path,
tensor_parallel_size,
max_model_len,
max_tokens,
quantization,
load_choices,
result_queue,
):
try:
with fd_runner(
model_path,
tensor_parallel_size=tensor_parallel_size,
max_model_len=max_model_len,
load_choices=load_choices,
quantization=quantization,
engine_worker_queue_port=FD_ENGINE_QUEUE_PORT,
) as fd_model:
fd_outputs = fd_model.generate_topp0(prompts, max_tokens=max_tokens)
result_queue.put(fd_outputs)
except Exception:
print(f"Failed using {load_choices} laoder to load model from {model_path}.")
traceback.print_exc()
pytest.fail(f"Failed to initialize LLM model from {model_path}")
def run_with_timeout(target, args, timeout=60 * 5):
clear_logs()
result_queue = Queue()
p = Process(target=target, args=(*args, result_queue))
p.start()
p.join(timeout)
if p.is_alive():
p.terminate()
print_logs()
raise RuntimeError("Worker process hung and was terminated")
try:
return result_queue.get(timeout=60)
except Exception as e:
raise RuntimeError(f"Failed to get result from worker: {e}")
model_param_map = { model_param_map = {
"Qwen3-0.6B": { "Qwen3-0.6B": {
"quantizations": ["None", "wint4", "wint8"], "quantizations": ["None", "wint8", "wint4"],
}, },
"ernie-4_5-21b-a3b-bf16-paddle": { "ernie-4_5-21b-a3b-bf16-paddle": {
"tensor_parallel_size": 2, "tensor_parallel_size": 2,
@@ -217,22 +96,38 @@ def test_common_model(
env, env,
monkeypatch, monkeypatch,
) -> None: ) -> None:
base_path = os.getenv("MODEL_PATH") model_path = get_paddle_model_path(model_name_or_path)
if base_path:
model_path = os.path.join(base_path, model_name_or_path)
else:
model_path = model_name_or_path
if env: if env:
for k, v in env.items(): for k, v in env.items():
monkeypatch.setenv(k, v) monkeypatch.setenv(k, v)
fd_outputs_v0 = run_with_timeout( fd_outputs_v0 = run_with_timeout(
target=form_model_get_output, target=form_model_get_output_topp0,
args=(fd_runner, model_path, tensor_parallel_size, max_model_len, max_tokens, quantization, "default"), args=(
fd_runner,
model_path,
tensor_parallel_size,
max_model_len,
max_tokens,
quantization,
"default",
FD_ENGINE_QUEUE_PORT,
prompts,
),
) )
fd_outputs_v1 = run_with_timeout( fd_outputs_v1 = run_with_timeout(
target=form_model_get_output, target=form_model_get_output_topp0,
args=(fd_runner, model_path, tensor_parallel_size, max_model_len, max_tokens, quantization, "default_v1"), args=(
fd_runner,
model_path,
tensor_parallel_size,
max_model_len,
max_tokens,
quantization,
"default_v1",
FD_ENGINE_QUEUE_PORT,
prompts,
),
) )
check_tokens_id_and_text_close( check_tokens_id_and_text_close(
outputs_0_lst=fd_outputs_v0, outputs_0_lst=fd_outputs_v0,
@@ -240,66 +135,3 @@ def test_common_model(
name_0="default loader", name_0="default loader",
name_1="default_v1 loader", name_1="default_v1 loader",
) )
hugging_face_model_param_map = {
"Qwen2.5-7B-Instruct": {
"tensor_parallel_size": 2,
"quantizations": ["None"],
},
}
hf_params = []
for model, cfg in hugging_face_model_param_map.items():
for q in cfg["quantizations"]:
hf_params.append(
pytest.param(
model,
cfg.get("tensor_parallel_size", 1),
cfg.get("max_model_len", 1024),
q,
cfg.get("max_tokens", 32),
marks=[pytest.mark.core_model],
)
)
@pytest.mark.parametrize(
"model_name_or_path,tensor_parallel_size,max_model_len,quantization,max_tokens",
hf_params,
)
def test_paddle_vs_torch_model(
fd_runner,
model_name_or_path: str,
tensor_parallel_size: int,
max_model_len: int,
max_tokens: int,
quantization: str,
) -> None:
fd_model_path, torch_model_path = get_model_paths(model_name_or_path)
paddle_outputs = run_with_timeout(
target=form_model_get_output,
args=(fd_runner, fd_model_path, tensor_parallel_size, max_model_len, max_tokens, quantization, "default"),
)
hf_outputs = run_with_timeout(
target=form_model_get_output,
args=(
fd_runner,
torch_model_path,
tensor_parallel_size,
max_model_len,
max_tokens,
quantization,
"default_v1",
),
)
check_tokens_id_and_text_close(
outputs_0_lst=paddle_outputs,
outputs_1_lst=hf_outputs,
name_0="Paddle model (default loader)",
name_1="HuggingFace model (default_v1 loader)",
)

View File

@@ -23,6 +23,11 @@ import time
import openai import openai
import pytest import pytest
current_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.abspath(os.path.join(current_dir, ".."))
if project_root not in sys.path:
sys.path.insert(0, project_root)
# Read ports from environment variables; use default values if not set # Read ports from environment variables; use default values if not set
FD_API_PORT = int(os.getenv("FD_API_PORT", 8188)) FD_API_PORT = int(os.getenv("FD_API_PORT", 8188))
FD_ENGINE_QUEUE_PORT = int(os.getenv("FD_ENGINE_QUEUE_PORT", 8133)) FD_ENGINE_QUEUE_PORT = int(os.getenv("FD_ENGINE_QUEUE_PORT", 8133))

View File

@@ -0,0 +1,148 @@
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import pytest
current_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.abspath(os.path.join(current_dir, ".."))
if project_root not in sys.path:
sys.path.insert(0, project_root)
from tests.model_loader.utils import (
calculate_diff_rate,
form_model_get_output_topp0,
get_torch_model_path,
run_with_timeout,
)
FD_ENGINE_QUEUE_PORT = int(os.getenv("FD_ENGINE_QUEUE_PORT", 8313))
prompts = ["北京天安门在哪里?"]
def check_result_against_baseline(outputs, baseline_file, threshold=0.05):
"""
Check model outputs against baseline file.
"""
try:
with open(baseline_file, "r", encoding="utf-8") as f:
baseline_content = f.read().strip()
except FileNotFoundError:
raise AssertionError(f"Baseline file not found: {baseline_file}")
# Combine all outputs into a single string for comparison
current_content = ""
for idx, output in enumerate(outputs):
# output format: (token_ids, text)
_, text = output
if isinstance(text, list):
text_str = "".join(text)
else:
text_str = text
current_content += text_str
temp_file = f"{os.path.basename(baseline_file)}-current"
with open(temp_file, "w", encoding="utf-8") as f:
f.write(current_content)
diff_rate = calculate_diff_rate(current_content, baseline_content)
if diff_rate >= threshold:
raise AssertionError(
f"Output differs from baseline file by too much ({diff_rate:.4%}):\n"
f"Current output: {current_content!r}\n"
f"Baseline content: {baseline_content!r}\n"
f"Current output saved to: {temp_file}"
)
hugging_face_model_param_map = {
"Qwen2.5-7B-Instruct": {
"tensor_parallel_size": 2,
"quantizations": ["wint8"],
},
"Qwen3-30B-A3B": {
"tensor_parallel_size": 2,
"quantizations": ["wint8"],
},
}
hf_params = []
for model, cfg in hugging_face_model_param_map.items():
for q in cfg["quantizations"]:
hf_params.append(
pytest.param(
model,
cfg.get("tensor_parallel_size", 2),
cfg.get("max_model_len", 1024),
q,
cfg.get("max_tokens", 100),
marks=[pytest.mark.core_model],
)
)
@pytest.mark.parametrize(
"model_name_or_path,tensor_parallel_size,max_model_len,quantization,max_tokens",
hf_params,
)
def test_model_against_baseline(
fd_runner,
model_name_or_path: str,
tensor_parallel_size: int,
max_model_len: int,
max_tokens: int,
quantization: str,
) -> None:
"""
Test that model output matches baseline file.
"""
torch_model_path = get_torch_model_path(model_name_or_path)
# Run model
hf_outputs = run_with_timeout(
target=form_model_get_output_topp0,
args=(
fd_runner,
torch_model_path,
tensor_parallel_size,
max_model_len,
max_tokens,
quantization,
"default_v1",
FD_ENGINE_QUEUE_PORT,
prompts,
),
)
# Determine baseline file path based on model name
base_path = os.getenv("MODEL_PATH", "")
# Get baseline suffix from config
model_config = hugging_face_model_param_map.get(model_name_or_path, {})
baseline_suffix = model_config.get("baseline_suffix", "tp2")
baseline_filename = f"{model_name_or_path}-{baseline_suffix}"
if base_path:
baseline_file = os.path.join(base_path, baseline_filename)
else:
baseline_file = baseline_filename
# Compare against baseline file
check_result_against_baseline(hf_outputs, baseline_file, threshold=0.05)

212
tests/model_loader/utils.py Normal file
View File

@@ -0,0 +1,212 @@
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import shutil
import signal
import socket
import subprocess
import traceback
from multiprocessing import Process, Queue
import pytest
TokensIdText = list[tuple[list[int], str]]
def clear_logs():
log_path = os.path.join(os.getcwd(), "log")
if os.path.exists(log_path):
try:
shutil.rmtree(log_path)
print(f"Deleted log directory: {log_path}")
except Exception as e:
print(f"Failed to delete log directory {log_path}: {e}")
else:
print(f"No log directory found at {log_path}")
def print_logs():
log_dir = os.path.join(os.getcwd(), "log")
log_file = os.path.join(log_dir, "workerlog.0")
if not os.path.exists(log_file):
print(f"Log file {log_file} does not exist.")
return
print(f"\n===== {log_file} start =====")
with open(log_file, "r") as f:
for line in f:
print(line, end="")
print(f"\n===== {log_file} end =====\n")
def run_with_timeout(target, args, timeout=60 * 5):
clear_logs()
result_queue = Queue()
p = Process(target=target, args=(*args, result_queue))
p.start()
p.join(timeout)
if p.is_alive():
p.terminate()
print_logs()
raise RuntimeError("Worker process hung and was terminated")
try:
return result_queue.get(timeout=60)
except Exception as e:
raise RuntimeError(f"Failed to get result from worker: {e}")
def form_model_get_output_topp0(
fd_runner,
model_path,
tensor_parallel_size,
max_model_len,
max_tokens,
quantization,
load_choices,
engine_worker_queue_port,
prompts,
result_queue,
):
try:
with fd_runner(
model_path,
tensor_parallel_size=tensor_parallel_size,
max_model_len=max_model_len,
load_choices=load_choices,
quantization=quantization,
engine_worker_queue_port=engine_worker_queue_port,
) as fd_model:
fd_outputs = fd_model.generate_topp0(prompts, max_tokens=max_tokens)
result_queue.put(fd_outputs)
except Exception:
print(f"Failed using {load_choices} laoder to load model from {model_path}.")
traceback.print_exc()
pytest.fail(f"Failed to initialize LLM model from {model_path}")
def kill_process_on_port(port: int):
"""
Kill processes that are listening on the given port.
Uses `lsof` to find process ids and sends SIGKILL.
"""
try:
output = subprocess.check_output(f"lsof -i:{port} -t", shell=True).decode().strip()
for pid in output.splitlines():
os.kill(int(pid), signal.SIGKILL)
print(f"Killed process on port {port}, pid={pid}")
except subprocess.CalledProcessError:
pass
def clean_ports(ports_to_clean: list[int]):
"""
Kill all processes occupying the ports listed in PORTS_TO_CLEAN.
"""
for port in ports_to_clean:
kill_process_on_port(port)
def is_port_open(host: str, port: int, timeout=1.0):
"""
Check if a TCP port is open on the given host.
Returns True if connection succeeds, False otherwise.
"""
try:
with socket.create_connection((host, port), timeout):
return True
except Exception:
return False
def get_paddle_model_path(base_model_name: str) -> str:
fd_base_path = os.getenv("MODEL_PATH")
if fd_base_path:
fd_model_path = os.path.join(fd_base_path, base_model_name)
else:
fd_model_path = base_model_name
return fd_model_path
def get_torch_model_path(base_model_name: str) -> str:
"""return (fastdeploy_path, huggingface_path)"""
# FastDeploy model path
fd_base_path = os.getenv("MODEL_PATH")
# HuggingFace model path
torch_model_path = os.path.join(
fd_base_path,
"torch",
base_model_name,
)
return torch_model_path
def check_tokens_id_and_text_close(
*,
outputs_0_lst: TokensIdText,
outputs_1_lst: TokensIdText,
name_0: str,
name_1: str,
warn_on_mismatch: bool = True,
) -> None:
assert len(outputs_0_lst) == len(outputs_1_lst)
for prompt_idx, (outputs_0, outputs_1) in enumerate(zip(outputs_0_lst, outputs_1_lst)):
assert len(outputs_0) == len(outputs_1)
output_ids_0, output_str_0 = outputs_0
output_ids_1, output_str_1 = outputs_1
# Loop through generated tokens.
for idx, (output_id_0, output_id_1) in enumerate(zip(output_ids_0, output_ids_1)):
is_tok_mismatch = output_id_0 != output_id_1
if is_tok_mismatch and warn_on_mismatch:
fail_msg = (
f"Test{prompt_idx}:"
f"\nMatched tokens:\t{output_ids_0[:idx]}"
f"\n{name_0}:\t{output_str_0!r}"
f"\n{name_1}:\t{output_str_1!r}"
)
raise AssertionError(fail_msg)
else:
if output_str_0 != output_str_1 and warn_on_mismatch:
fail_msg = f"Test{prompt_idx}:" f"\n{name_0}:\t{output_str_0!r}" f"\n{name_1}:\t{output_str_1!r}"
raise AssertionError(fail_msg)
def calculate_diff_rate(text1, text2):
"""
Calculate the difference rate between two strings
based on the normalized Levenshtein edit distance.
Returns a float in [0,1], where 0 means identical.
"""
if text1 == text2:
return 0.0
len1, len2 = len(text1), len(text2)
dp = [[0] * (len2 + 1) for _ in range(len1 + 1)]
for i in range(len1 + 1):
for j in range(len2 + 1):
if i == 0 or j == 0:
dp[i][j] = i + j
elif text1[i - 1] == text2[j - 1]:
dp[i][j] = dp[i - 1][j - 1]
else:
dp[i][j] = 1 + min(dp[i - 1][j], dp[i][j - 1], dp[i - 1][j - 1])
edit_distance = dp[len1][len2]
max_len = max(len1, len2)
return edit_distance / max_len if max_len > 0 else 0.0