[FastDeploy Cli] Bench Command eval and throughput (#4239)

* bench command

* bench command

* bench command

* bench command

* bench command

---------

Co-authored-by: K11OntheBoat <your_email@example.com>
This commit is contained in:
qwes5s5
2025-10-10 16:17:44 +08:00
committed by GitHub
parent 3aa04fbf21
commit 6fd3e72da1
24 changed files with 4237 additions and 1313 deletions

View File

View File

@@ -28,8 +28,10 @@ from dataclasses import dataclass
from io import BytesIO
from typing import Any, Optional, Union
import numpy as np
from fontTools.feaLib import ast
from PIL import Image
from transformers import PreTrainedTokenizerBase
from fastdeploy.utils import FlexibleArgumentParser
@@ -320,6 +322,90 @@ class EBChatDataset(BenchmarkDataset):
return samples
class RandomDataset(BenchmarkDataset):
# Default values copied from benchmark_serving.py for the random dataset.
DEFAULT_PREFIX_LEN = 0
DEFAULT_RANGE_RATIO = 0.0
DEFAULT_INPUT_LEN = 1024
DEFAULT_OUTPUT_LEN = 128
def __init__(
self,
**kwargs,
) -> None:
super().__init__(**kwargs)
random.seed(self.random_seed)
np.random.seed(self.random_seed)
def sample(
self,
tokenizer: PreTrainedTokenizerBase,
num_requests: int,
prefix_len: int = DEFAULT_PREFIX_LEN,
range_ratio: float = DEFAULT_RANGE_RATIO,
input_len: int = DEFAULT_INPUT_LEN,
output_len: int = DEFAULT_OUTPUT_LEN,
**kwargs,
) -> list[SampleRequest]:
# Enforce range_ratio < 1
assert range_ratio < 1.0, "random_range_ratio must be < 1.0 to ensure a valid sampling range"
cnt = 1
vocab_size = tokenizer.vocab_size
num_special_tokens = tokenizer.num_special_tokens_to_add()
real_input_len = input_len - num_special_tokens
prefix_token_ids = np.random.randint(0, vocab_size, size=prefix_len).tolist() if prefix_len > 0 else []
# New sampling logic: [X * (1 - b), X * (1 + b)]
input_low = int(real_input_len * (1 - range_ratio))
input_high = int(real_input_len * (1 + range_ratio))
output_low = int(output_len * (1 - range_ratio))
output_high = int(output_len * (1 + range_ratio))
# Add logging for debugging
logger.info(
"Sampling input_len from [%s, %s] and output_len from [%s, %s]",
input_low,
input_high,
output_low,
output_high,
)
input_lens = np.random.randint(input_low, input_high + 1, size=num_requests)
output_lens = np.random.randint(output_low, output_high + 1, size=num_requests)
offsets = np.random.randint(0, vocab_size, size=num_requests)
requests = []
for i in range(num_requests):
inner_seq = ((offsets[i] + i + np.arange(input_lens[i])) % vocab_size).tolist()
token_sequence = prefix_token_ids + inner_seq
prompt = tokenizer.decode(token_sequence)
# After decoding the prompt we have to encode and decode it again.
# This is done because in some cases N consecutive tokens
# give a string tokenized into != N number of tokens.
# For example for GPT2Tokenizer:
# [6880, 6881] -> ['Ġcalls', 'here'] ->
# [1650, 939, 486] -> ['Ġcall', 'sh', 'ere']
# To avoid uncontrolled change of the prompt length,
# the encoded sequence is truncated before being decode again.
total_input_len = prefix_len + int(input_lens[i])
re_encoded_sequence = tokenizer.encode(prompt, add_special_tokens=False)[:total_input_len]
prompt = tokenizer.decode(re_encoded_sequence)
total_input_len = len(re_encoded_sequence)
requests.append(
SampleRequest(
no=cnt,
prompt=prompt,
prompt_len=total_input_len,
history_QA=[],
json_data=None,
expected_output_len=int(output_lens[i]),
)
)
cnt += 1
return requests
class _ValidateDatasetArgs(argparse.Action):
"""Argparse action to validate dataset name and path compatibility."""

View File

@@ -0,0 +1,137 @@
"""
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
# This file is modified from https://github.com/vllm-project/vllm/blob/main/vllm/benchmarks/latency.py
import argparse
import dataclasses
import json
import time
import numpy as np
from tqdm import tqdm
import fastdeploy.envs as envs
from fastdeploy.engine.args_utils import EngineArgs
def add_cli_args(parser: argparse.ArgumentParser):
parser.add_argument("--input-len", type=int, default=32)
parser.add_argument("--output-len", type=int, default=128)
parser.add_argument("--batch-size", type=int, default=8)
parser.add_argument(
"--n",
type=int,
default=1,
help="Number of generated sequences per prompt.",
)
parser.add_argument("--use-beam-search", action="store_true")
parser.add_argument(
"--num-iters-warmup",
type=int,
default=10,
help="Number of iterations to run for warmup.",
)
parser.add_argument("--num-iters", type=int, default=30, help="Number of iterations to run.")
parser.add_argument(
"--profile",
action="store_true",
help="profile the generation process of a single batch",
)
parser.add_argument(
"--output-json",
type=str,
default=None,
help="Path to save the latency results in JSON format.",
)
parser.add_argument(
"--disable-detokenize",
action="store_true",
help=("Do not detokenize responses (i.e. do not include " "detokenization time in the latency measurement)"),
)
parser = EngineArgs.add_cli_args(parser)
# V1 enables prefix caching by default which skews the latency
# numbers. We need to disable prefix caching by default.
parser.set_defaults(enable_prefix_caching=False)
def main(args: argparse.Namespace):
if args.profile and not envs.VLLM_TORCH_PROFILER_DIR:
raise OSError(
"The environment variable 'VLLM_TORCH_PROFILER_DIR' is not set. "
"Please set it to a valid path to use torch profiler."
)
engine_args = EngineArgs.from_cli_args(args)
# Lazy import to avoid importing LLM when the bench command is not selected.
from fastdeploy import LLM, SamplingParams
# NOTE(woosuk): If the request cannot be processed in a single batch,
# the engine will automatically process the request in multiple batches.
llm = LLM(**dataclasses.asdict(engine_args))
assert llm.llm_engine.cfg.max_model_len >= (args.input_len + args.output_len), (
"Please ensure that max_model_len is greater than" " the sum of input_len and output_len."
)
sampling_params = SamplingParams(
n=args.n,
temperature=1.0,
top_p=1.0,
max_tokens=args.output_len,
)
dummy_prompt_token_ids = np.random.randint(10000, size=(args.batch_size, args.input_len))
dummy_prompts = [{"prompt_token_ids": batch} for batch in dummy_prompt_token_ids.tolist()]
def llm_generate():
llm.generate(dummy_prompts, sampling_params=sampling_params, use_tqdm=False, stream=True)
def run_to_completion():
start_time = time.perf_counter()
llm_generate()
end_time = time.perf_counter()
latency = end_time - start_time
return latency
print("Warming up...")
for _ in tqdm(range(args.num_iters_warmup), desc="Warmup iterations"):
run_to_completion()
if args.profile:
print("Profiling...")
run_to_completion()
return
# Benchmark.
latencies = []
for _ in tqdm(range(args.num_iters), desc="Profiling iterations"):
latencies.append(run_to_completion())
latencies = np.array(latencies)
percentages = [10, 25, 50, 75, 90, 99]
percentiles = np.percentile(latencies, percentages)
print(f"Avg latency: {np.mean(latencies)} seconds")
for percentage, percentile in zip(percentages, percentiles):
print(f"{percentage}% percentile latency: {percentile} seconds")
# Output JSON results if specified
if args.output_json:
results = {
"avg_latency": np.mean(latencies),
"latencies": latencies.tolist(),
"percentiles": dict(zip(percentages, percentiles.tolist())),
}
with open(args.output_json, "w") as f:
json.dump(results, f, indent=4)

View File

View File

@@ -0,0 +1,90 @@
"""
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
# This file is modified from https://github.com/vllm-project/vllm/blob/main/benchmarks/benchmark_utils.py
import argparse
import json
import math
import os
from typing import Any
def convert_to_pytorch_benchmark_format(
args: argparse.Namespace,
metrics: dict[str, list],
extra_info: dict[str, Any],
) -> list:
"""
Save the benchmark results in the format used by PyTorch OSS benchmark with
on metric per record
https://github.com/pytorch/pytorch/wiki/How-to-integrate-with-PyTorch-OSS-benchmark-database
"""
records = []
if not os.environ.get("SAVE_TO_PYTORCH_BENCHMARK_FORMAT", False):
return records
for name, benchmark_values in metrics.items():
record = {
"benchmark": {
"name": "vLLM benchmark",
"extra_info": {
"args": vars(args),
},
},
"model": {
"name": args.model,
},
"metric": {
"name": name,
"benchmark_values": benchmark_values,
"extra_info": extra_info,
},
}
tp = record["benchmark"]["extra_info"]["args"].get("tensor_parallel_size")
# Save tensor_parallel_size parameter if it's part of the metadata
if not tp and "tensor_parallel_size" in extra_info:
record["benchmark"]["extra_info"]["args"]["tensor_parallel_size"] = extra_info["tensor_parallel_size"]
records.append(record)
return records
class InfEncoder(json.JSONEncoder):
"""InfEncoder"""
def clear_inf(self, o: Any):
"""clear_inf"""
if isinstance(o, dict):
return {k: self.clear_inf(v) for k, v in o.items()}
elif isinstance(o, list):
return [self.clear_inf(v) for v in o]
elif isinstance(o, float) and math.isinf(o):
return "inf"
return o
def iterencode(self, o: Any, *args, **kwargs) -> Any:
"""iterencode"""
return super().iterencode(self.clear_inf(o), *args, **kwargs)
def write_to_json(filename: str, records: list) -> None:
"""write_to_json"""
with open(filename, "w") as f:
json.dump(records, f, cls=InfEncoder)

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,464 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
"""Benchmark offline inference throughput."""
import argparse
import dataclasses
import json
import os
import random
import time
import warnings
from typing import Any, Optional
try:
import torch
TORCH_AVAILABLE = True
except (ImportError, NameError, AttributeError, OSError):
TORCH_AVAILABLE = False
from tqdm import tqdm
from transformers import AutoModelForCausalLM, AutoTokenizer, PreTrainedTokenizerBase
from fastdeploy.benchmarks.datasets import (
EBChatDataset,
EBDataset,
RandomDataset,
SampleRequest,
)
from fastdeploy.benchmarks.lib.utils import (
convert_to_pytorch_benchmark_format,
write_to_json,
)
from fastdeploy.engine.args_utils import EngineArgs
from fastdeploy.engine.request import RequestOutput
def run_fd(
requests: list[SampleRequest],
n: int,
engine_args: EngineArgs,
disable_detokenize: bool = False,
) -> tuple[float, Optional[list[RequestOutput]]]:
from fastdeploy import LLM, SamplingParams
llm = LLM(**dataclasses.asdict(engine_args))
assert all(
llm.llm_engine.cfg.max_model_len >= (request.prompt_len + request.expected_output_len) for request in requests
), (
"Please ensure that max_model_len is greater than the sum of"
" prompt_len and expected_output_len for all requests."
)
# Add the requests to the engine.
prompts = []
sampling_params: list[SamplingParams] = []
for request in requests:
# 处理tokenized输入
if "prompt_token_ids" in request.prompt:
prompt = {
"prompt_token_ids": request.prompt["prompt_token_ids"],
"multi_modal_data": getattr(request, "multi_modal_data", None),
}
# 处理普通文本输入
else:
prompt = {"prompt": str(request.prompt), "multi_modal_data": getattr(request, "multi_modal_data", None)}
prompts.append(prompt)
sampling_params.append(
SamplingParams(
n=n,
temperature=1.0,
top_p=1.0,
max_tokens=request.expected_output_len,
)
)
outputs = None
start = time.perf_counter()
outputs = llm.generate(prompts, sampling_params, use_tqdm=True)
end = time.perf_counter()
return end - start, outputs
def run_fd_chat(
requests: list[SampleRequest], n: int, engine_args: EngineArgs, disable_detokenize: bool = False
) -> tuple[float, list[RequestOutput]]:
"""
Run vLLM chat benchmark. This function is recommended ONLY for benchmarking
multimodal models as it properly handles multimodal inputs and chat
formatting. For non-multimodal models, use run_vllm() instead.
"""
from fastdeploy import LLM, SamplingParams
llm = LLM(**dataclasses.asdict(engine_args))
assert all(
llm.llm_engine.cfg.max_model_len >= (request.prompt_len + request.expected_output_len) for request in requests
), (
"Please ensure that max_model_len is greater than the sum of "
"prompt_len and expected_output_len for all requests."
)
prompts = []
sampling_params: list[SamplingParams] = []
for request in requests:
prompts.append(request.prompt)
sampling_params.append(
SamplingParams(
n=n,
temperature=1.0,
top_p=1.0,
max_tokens=request.expected_output_len,
)
)
start = time.perf_counter()
outputs = llm.chat(prompts, sampling_params, use_tqdm=True)
end = time.perf_counter()
return end - start, outputs
def run_hf(
requests: list[SampleRequest],
model: str,
tokenizer: PreTrainedTokenizerBase,
n: int,
max_batch_size: int,
trust_remote_code: bool,
disable_detokenize: bool = False,
) -> float:
llm = AutoModelForCausalLM.from_pretrained(model, torch_dtype=torch.float16, trust_remote_code=trust_remote_code)
if llm.config.model_type == "llama":
# To enable padding in the HF backend.
tokenizer.pad_token = tokenizer.eos_token
llm = llm.cuda()
pbar = tqdm(total=len(requests))
start = time.perf_counter()
batch: list[str] = []
max_prompt_len = 0
max_output_len = 0
for i in range(len(requests)):
prompt = requests[i].prompt
prompt_len = requests[i].prompt_len
output_len = requests[i].expected_output_len
# Add the prompt to the batch.
batch.append(prompt)
max_prompt_len = max(max_prompt_len, prompt_len)
max_output_len = max(max_output_len, output_len)
if len(batch) < max_batch_size and i != len(requests) - 1:
# Check if we can add more requests to the batch.
next_prompt_len = requests[i + 1].prompt_len
next_output_len = requests[i + 1].expected_output_len
if (max(max_prompt_len, next_prompt_len) + max(max_output_len, next_output_len)) <= 2048:
# We can add more requests to the batch.
continue
# Generate the sequences.
input_ids = tokenizer(batch, return_tensors="pt", padding=True).input_ids
llm_outputs = llm.generate(
input_ids=input_ids.cuda(),
do_sample=True,
num_return_sequences=n,
temperature=1.0,
top_p=1.0,
use_cache=True,
max_new_tokens=max_output_len,
)
if not disable_detokenize:
# Include the decoding time.
tokenizer.batch_decode(llm_outputs, skip_special_tokens=True)
pbar.update(len(batch))
# Clear the batch.
batch = []
max_prompt_len = 0
max_output_len = 0
end = time.perf_counter()
return end - start
def save_to_pytorch_benchmark_format(args: argparse.Namespace, results: dict[str, Any]) -> None:
pt_records = convert_to_pytorch_benchmark_format(
args=args,
metrics={
"requests_per_second": [results["requests_per_second"]],
"tokens_per_second": [results["tokens_per_second"]],
},
extra_info={k: results[k] for k in ["elapsed_time", "num_requests", "total_num_tokens"]},
)
if pt_records:
# Don't use json suffix here as we don't want CI to pick it up
pt_file = f"{os.path.splitext(args.output_json)[0]}.pytorch.json"
write_to_json(pt_file, pt_records)
def get_requests(args, tokenizer):
# Common parameters for all dataset types.
common_kwargs = {
"dataset_path": args.dataset_path,
"random_seed": args.seed,
}
sample_kwargs = {
# "tokenizer": tokenizer,
"lora_path": args.lora_path,
# "max_loras": args.max_loras,
"num_requests": args.num_prompts,
"input_len": args.input_len,
"output_len": args.output_len,
}
if args.dataset_path is None or args.dataset_name == "random":
sample_kwargs["range_ratio"] = args.random_range_ratio
sample_kwargs["prefix_len"] = args.prefix_len
sample_kwargs["tokenizer"] = tokenizer
dataset_cls = RandomDataset
elif args.dataset_name == "EB":
dataset_cls = EBDataset
elif args.dataset_name == "EBChat":
dataset_cls = EBChatDataset
else:
raise ValueError(f"Unknown dataset name: {args.dataset_name}")
# Remove None values
sample_kwargs = {k: v for k, v in sample_kwargs.items() if v is not None}
return dataset_cls(**common_kwargs).sample(**sample_kwargs)
def validate_args(args):
"""
Validate command-line arguments.
"""
# === Deprecation and Defaulting ===
if args.dataset is not None:
warnings.warn(
"The '--dataset' argument will be deprecated in the next release. "
"Please use '--dataset-name' and '--dataset-path' instead.",
stacklevel=2,
)
args.dataset_path = args.dataset
if not getattr(args, "tokenizer", None):
args.tokenizer = args.model
# === Backend Validation ===
valid_backends = {"fastdeploy", "hf", "fastdeploy-chat"}
if args.backend not in valid_backends:
raise ValueError(f"Unsupported backend: {args.backend}")
# === Dataset Configuration ===
if not args.dataset and not args.dataset_path:
print("When dataset path is not set, it will default to random dataset")
args.dataset_name = "random"
if args.input_len is None:
raise ValueError("input_len must be provided for a random dataset")
# === Dataset Name Specific Checks ===
# --hf-subset and --hf-split: only used
# when dataset_name is 'hf'
if args.dataset_name != "hf" and (
getattr(args, "hf_subset", None) is not None or getattr(args, "hf_split", None) is not None
):
warnings.warn(
"--hf-subset and --hf-split will be ignored \
since --dataset-name is not 'hf'.",
stacklevel=2,
)
# elif args.dataset_name == "hf":
# if args.dataset_path in (
# VisionArenaDataset.SUPPORTED_DATASET_PATHS.keys()
# | ConversationDataset.SUPPORTED_DATASET_PATHS):
# assert args.backend == "vllm-chat", f"{args.dataset_path} needs to use vllm-chat as the backend." #noqa: E501
# elif args.dataset_path in (InstructCoderDataset.SUPPORTED_DATASET_PATHS
# | AIMODataset.SUPPORTED_DATASET_PATHS):
# assert args.backend == "vllm", f"{args.dataset_path} needs to use vllm as the backend." #noqa: E501
# else:
# raise ValueError(
# f"{args.dataset_path} is not supported by hf dataset.")
# --random-range-ratio: only used when dataset_name is 'random'
if args.dataset_name != "random" and args.random_range_ratio is not None:
warnings.warn(
"--random-range-ratio will be ignored since \
--dataset-name is not 'random'.",
stacklevel=2,
)
# --prefix-len: only used when dataset_name is 'random', 'sonnet', or not
# set.
if args.dataset_name not in {"random", "sonnet", None} and args.prefix_len is not None:
warnings.warn(
"--prefix-len will be ignored since --dataset-name\
is not 'random', 'sonnet', or not set.",
stacklevel=2,
)
# === LoRA Settings ===
if getattr(args, "enable_lora", False) and args.lora_path is None:
raise ValueError("LoRA path must be provided when enable_lora is True")
# === Backend-specific Validations ===
if args.backend == "hf" and args.hf_max_batch_size is None:
raise ValueError("HF max batch size is required for HF backend")
if args.backend != "hf" and args.hf_max_batch_size is not None:
raise ValueError("HF max batch size is only for HF backend.")
if args.backend in {"hf", "mii"} and getattr(args, "quantization", None) is not None:
raise ValueError("Quantization is only for vLLM backend.")
def add_cli_args(parser: argparse.ArgumentParser):
parser.add_argument("--backend", type=str, choices=["fastdeploy", "hf", "fastdeploy-chat"], default="fastdeploy")
parser.add_argument(
"--dataset-name",
type=str,
choices=["EBChat", "random", "EB"],
help="Name of the dataset to benchmark on.",
default="random",
)
parser.add_argument(
"--dataset",
type=str,
default=None,
help="Path to the ShareGPT dataset, will be deprecated in\
the next release. The dataset is expected to "
"be a json in form of list[dict[..., conversations: "
"list[dict[..., value: <prompt_or_response>]]]]",
)
parser.add_argument("--dataset-path", type=str, default=None, help="Path to the dataset")
parser.add_argument("--input-len", type=int, default=None, help="Input prompt length for each request")
parser.add_argument(
"--output-len",
type=int,
default=None,
help="Output length for each request. Overrides the " "output length from the dataset.",
)
parser.add_argument("--n", type=int, default=1, help="Number of generated sequences per prompt.")
parser.add_argument("--num-prompts", type=int, default=50, help="Number of prompts to process.")
parser.add_argument("--hf-max-batch-size", type=int, default=None, help="Maximum batch size for HF backend.")
parser.add_argument(
"--output-json", type=str, default=None, help="Path to save the throughput results in JSON format."
)
parser.add_argument(
"--disable-frontend-multiprocessing",
action="store_true",
default=False,
help="Disable decoupled async engine frontend.",
)
parser.add_argument(
"--disable-detokenize",
action="store_true",
help=("Do not detokenize the response (i.e. do not include " "detokenization time in the measurement)"),
)
# LoRA
parser.add_argument(
"--lora-path",
type=str,
default=None,
help="Path to the lora adapters to use. This can be an absolute path, "
"a relative path, or a Hugging Face model identifier.",
)
parser.add_argument(
"--prefix-len",
type=int,
default=0,
help="Number of fixed prefix tokens before the random " "context in a request (default: 0).",
)
# random dataset
parser.add_argument(
"--random-range-ratio",
type=float,
default=0.0,
help="Range ratio for sampling input/output length, "
"used only for RandomDataset. Must be in the range [0, 1) to define "
"a symmetric sampling range "
"[length * (1 - range_ratio), length * (1 + range_ratio)].",
)
# hf dtaset
parser.add_argument("--hf-subset", type=str, default=None, help="Subset of the HF dataset.")
parser.add_argument("--hf-split", type=str, default=None, help="Split of the HF dataset.")
parser.add_argument(
"--trust_remote_code",
action="store_true",
help="Sets trust_remote_code to True to execute code to create HF Datasets from the Hub",
)
parser = EngineArgs.add_cli_args(parser)
parser.set_defaults(enable_prefix_caching=False)
def main(args: argparse.Namespace):
if args.tokenizer is None:
args.tokenizer = args.model
validate_args(args)
if args.seed is None:
args.seed = 0
random.seed(args.seed)
# Sample the requests.
if args.backend == "hf":
tokenizer = AutoTokenizer.from_pretrained(args.tokenizer, trust_remote_code=args.trust_remote_code)
else:
tokenizer = None
requests = get_requests(args, tokenizer)
# is_multi_modal = any(request.multi_modal_data is not None
# for request in requests)
request_outputs: Optional[list[RequestOutput]] = None
if args.backend == "fastdeploy":
elapsed_time, request_outputs = run_fd(
requests, args.n, EngineArgs.from_cli_args(args), args.disable_detokenize
)
elif args.backend == "hf":
if not TORCH_AVAILABLE:
raise Exception("PyTorch is not available.")
else:
assert args.tensor_parallel_size == 1
elapsed_time = run_hf(
requests,
args.model,
tokenizer,
args.n,
args.hf_max_batch_size,
args.trust_remote_code,
args.disable_detokenize,
)
elif args.backend == "fastdeploy-chat":
elapsed_time, request_outputs = run_fd_chat(
requests, args.n, EngineArgs.from_cli_args(args), args.disable_detokenize
)
else:
raise ValueError(f"Unknown backend: {args.backend}")
if request_outputs:
# Note: with the vllm and vllm-chat backends,
# we have request_outputs, which we use to count tokens.
total_prompt_tokens = 0
total_output_tokens = 0
for ro in request_outputs:
if not isinstance(ro, RequestOutput):
continue
total_prompt_tokens += len(ro.prompt_token_ids) if ro.prompt_token_ids else 0
if ro.outputs and hasattr(ro.outputs, "token_ids"):
total_output_tokens += len(ro.outputs.token_ids)
total_num_tokens = total_prompt_tokens + total_output_tokens
else:
total_num_tokens = sum(r.prompt_len + r.expected_output_len for r in requests)
total_output_tokens = sum(r.expected_output_len for r in requests)
total_prompt_tokens = total_num_tokens - total_output_tokens
print(
f"Throughput: {len(requests) / elapsed_time:.2f} requests/s, "
f"{total_num_tokens / elapsed_time:.2f} total tokens/s, "
f"{total_output_tokens / elapsed_time:.2f} output tokens/s"
)
print(f"Total num prompt tokens: {total_prompt_tokens}")
print(f"Total num output tokens: {total_output_tokens}")
# Output JSON results if specified
if args.output_json:
results = {
"elapsed_time": elapsed_time,
"num_requests": len(requests),
"total_num_tokens": total_num_tokens,
"requests_per_second": len(requests) / elapsed_time,
"tokens_per_second": total_num_tokens / elapsed_time,
}
with open(args.output_json, "w") as f:
json.dump(results, f, indent=4)
save_to_pytorch_benchmark_format(args, results)

View File

@@ -1,7 +1,13 @@
from fastdeploy.entrypoints.cli.benchmark.eval import BenchmarkEvalSubcommand
from fastdeploy.entrypoints.cli.benchmark.latency import BenchmarkLatencySubcommand
from fastdeploy.entrypoints.cli.benchmark.serve import BenchmarkServingSubcommand
from fastdeploy.entrypoints.cli.benchmark.throughput import (
BenchmarkThroughputSubcommand,
)
__all__: list[str] = [
"BenchmarkLatencySubcommand",
"BenchmarkServingSubcommand",
"BenchmarkThroughputSubcommand",
"BenchmarkEvalSubcommand",
]

View File

@@ -0,0 +1,416 @@
"""
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
import argparse
import json
import logging
import subprocess
import sys
from functools import partial
from typing import Union
import pkg_resources
from fastdeploy.entrypoints.cli.benchmark.base import BenchmarkSubcommandBase
def _int_or_none_list_arg_type(min_len: int, max_len: int, defaults: str, value: str, split_char: str = ","):
def parse_value(item):
item = item.strip().lower()
if item == "none":
return None
try:
return int(item)
except ValueError:
raise argparse.ArgumentTypeError(f"{item} is not an integer or None")
items = [parse_value(v) for v in value.split(split_char)]
num_items = len(items)
if num_items == 1:
# Makes downstream handling the same for single and multiple values
items = items * max_len
elif num_items < min_len or num_items > max_len:
raise argparse.ArgumentTypeError(f"Argument requires {max_len} integers or None, separated by '{split_char}'")
elif num_items != max_len:
logging.warning(
f"Argument requires {max_len} integers or None, separated by '{split_char}'. "
"Missing values will be filled with defaults."
)
default_items = [parse_value(v) for v in defaults.split(split_char)]
items.extend(default_items[num_items:]) # extend items list with missing defaults
return items
def try_parse_json(value: str) -> Union[str, dict, None]:
"""尝试解析JSON格式的字符串"""
if value is None:
return None
try:
return json.loads(value)
except json.JSONDecodeError:
if "{" in value:
raise argparse.ArgumentTypeError(f"Invalid JSON: {value}. Hint: Use double quotes for JSON strings.")
return value
class BenchmarkEvalSubcommand(BenchmarkSubcommandBase):
"""The `eval` subcommand for fastdeploy bench."""
name = "eval"
help = "Run evaluation using lm-evaluation-harness."
@classmethod
def add_cli_args(cls, parser: argparse.ArgumentParser) -> None:
parser.add_argument("--model", "-m", type=str, default="hf", help="Name of model e.g. `hf`")
parser.add_argument(
"--tasks",
"-t",
default=None,
type=str,
metavar="task1,task2",
help="Comma-separated list of task names or task groupings to evaluate on.\nTo get full list of tasks, use one of the commands `lm-eval --tasks {{list_groups,list_subtasks,list_tags,list}}` to list out all available names for task groupings; only (sub)tasks; tags; or all of the above",
)
parser.add_argument(
"--model_args",
"-a",
default="",
type=try_parse_json,
help="""Comma separated string or JSON formatted arguments for model, e.g. `pretrained=EleutherAI/pythia-160m,dtype=float32` or '{"pretrained":"EleutherAI/pythia-160m","dtype":"float32"}'""",
)
parser.add_argument(
"--num_fewshot",
"-f",
type=int,
default=None,
metavar="N",
help="Number of examples in few-shot context",
)
parser.add_argument(
"--batch_size",
"-b",
type=str,
default=1,
metavar="auto|auto:N|N",
help="Acceptable values are 'auto', 'auto:N' or N, where N is an integer. Default 1.",
)
parser.add_argument(
"--max_batch_size",
type=int,
default=None,
metavar="N",
help="Maximal batch size to try with --batch_size auto.",
)
parser.add_argument(
"--device",
type=str,
default=None,
help="Device to use (e.g. cuda, cuda:0, cpu).",
)
parser.add_argument(
"--output_path",
"-o",
default=None,
type=str,
metavar="DIR|DIR/file.json",
help="Path where result metrics will be saved. Can be either a directory or a .json file. If the path is a directory and log_samples is true, the results will be saved in the directory. Else the parent directory will be used.",
)
parser.add_argument(
"--limit",
"-L",
type=float,
default=None,
metavar="N|0<N<1",
help="Limit the number of examples per task. "
"If <1, limit is a percentage of the total number of examples.",
)
parser.add_argument(
"--samples",
"-E",
default=None,
type=str,
metavar="/path/to/json",
help='JSON string or path to JSON file containing doc indices of selected examples to test. Format: {"task_name":[indices],...}',
)
parser.add_argument(
"--use_cache",
"-c",
type=str,
default=None,
metavar="DIR",
help="A path to a sqlite db file for caching model responses. `None` if not caching.",
)
parser.add_argument(
"--cache_requests",
type=str,
default=None,
choices=["true", "refresh", "delete"],
help="Speed up evaluation by caching the building of dataset requests. `None` if not caching.",
)
parser.add_argument(
"--check_integrity",
action="store_true",
help="Whether to run the relevant part of the test suite for the tasks.",
)
parser.add_argument(
"--write_out",
"-w",
action="store_true",
default=False,
help="Prints the prompt for the first few documents.",
)
parser.add_argument(
"--log_samples",
"-s",
action="store_true",
default=False,
help="If True, write out all model outputs and documents for per-sample measurement and post-hoc analysis. Use with --output_path.",
)
parser.add_argument(
"--system_instruction",
type=str,
default=None,
help="System instruction to be used in the prompt",
)
parser.add_argument(
"--apply_chat_template",
type=str,
nargs="?",
const=True,
default=False,
help=(
"If True, apply chat template to the prompt. "
"Providing `--apply_chat_template` without an argument will apply the default chat template to the prompt. "
"To apply a specific template from the available list of templates, provide the template name as an argument. "
"E.g. `--apply_chat_template template_name`"
),
)
parser.add_argument(
"--fewshot_as_multiturn",
action="store_true",
default=False,
help="If True, uses the fewshot as a multi-turn conversation",
)
parser.add_argument(
"--show_config",
action="store_true",
default=False,
help="If True, shows the the full config of all tasks at the end of the evaluation.",
)
parser.add_argument(
"--include_path",
type=str,
default=None,
metavar="DIR",
help="Additional path to include if there are external tasks to include.",
)
parser.add_argument(
"--gen_kwargs",
type=try_parse_json,
default=None,
help=(
"Either comma delimited string or JSON formatted arguments for model generation on greedy_until tasks,"
""" e.g. '{"temperature":0.7,"until":["hello"]}' or temperature=0,top_p=0.1."""
),
)
parser.add_argument(
"--verbosity",
"-v",
type=str.upper,
default=None,
metavar="CRITICAL|ERROR|WARNING|INFO|DEBUG",
help="(Deprecated) Controls logging verbosity level. Use the `LOGLEVEL` environment variable instead. Set to DEBUG for detailed output when testing or adding new task configurations.",
)
parser.add_argument(
"--wandb_args",
type=str,
default="",
help="Comma separated string arguments passed to wandb.init, e.g. `project=lm-eval,job_type=eval",
)
parser.add_argument(
"--wandb_config_args",
type=str,
default="",
help="Comma separated string arguments passed to wandb.config.update. Use this to trace parameters that aren't already traced by default. eg. `lr=0.01,repeats=3",
)
parser.add_argument(
"--hf_hub_log_args",
type=str,
default="",
help="Comma separated string arguments passed to Hugging Face Hub's log function, e.g. `hub_results_org=EleutherAI,hub_repo_name=lm-eval-results`",
)
parser.add_argument(
"--predict_only",
"-x",
action="store_true",
default=False,
help="Use with --log_samples. Only model outputs will be saved and metrics will not be evaluated.",
)
default_seed_string = "0,1234,1234,1234"
parser.add_argument(
"--seed",
type=partial(_int_or_none_list_arg_type, 3, 4, default_seed_string),
default=default_seed_string, # for backward compatibility
help=(
"Set seed for python's random, numpy, torch, and fewshot sampling.\n"
"Accepts a comma-separated list of 4 values for python's random, numpy, torch, and fewshot sampling seeds, "
"respectively, or a single integer to set the same seed for all four.\n"
f"The values are either an integer or 'None' to not set the seed. Default is `{default_seed_string}` "
"(for backward compatibility).\n"
"E.g. `--seed 0,None,8,52` sets `random.seed(0)`, `torch.manual_seed(8)`, and fewshot sampling seed to 52. "
"Here numpy's seed is not set since the second value is `None`.\n"
"E.g, `--seed 42` sets all four seeds to 42."
),
)
parser.add_argument(
"--trust_remote_code",
action="store_true",
help="Sets trust_remote_code to True to execute code to create HF Datasets from the Hub",
)
parser.add_argument(
"--confirm_run_unsafe_code",
action="store_true",
help="Confirm that you understand the risks of running unsafe code for tasks that require it",
)
parser.add_argument(
"--metadata",
type=json.loads,
default=None,
help="""JSON string metadata to pass to task configs, for example '{"max_seq_lengths":[4096,8192]}'. Will be merged with model_args. Can also be set in task config.""",
)
@staticmethod
def cmd(args: argparse.Namespace) -> None:
"""构建并执行lm-eval命令"""
# 检查lm_eval版本是否为0.4.9.1
try:
version = pkg_resources.get_distribution("lm_eval").version
if version != "0.4.9.1":
print(
f"Warning: lm_eval version {version} is installed, but version 0.4.9.1 is required.\n"
"Please install the correct version with:\n"
"pip install lm_eval==0.4.9.1",
file=sys.stderr,
)
sys.exit(1)
except pkg_resources.DistributionNotFound:
print(
"Error: lm_eval is not installed. Please install version 0.4.9.1 with:\n"
"pip install lm_eval==0.4.9.1",
file=sys.stderr,
)
sys.exit(1)
cmd = ["lm-eval"]
if args.model:
cmd.extend(["--model", args.model])
if args.model:
cmd.extend(["--tasks", args.tasks])
if args.model_args:
if isinstance(args.model_args, dict):
model_args = ",".join(f"{k}={v}" for k, v in args.model_args.items())
else:
model_args = args.model_args
cmd.extend(["--model_args", model_args])
if args.gen_kwargs:
if isinstance(args.gen_kwargs, dict):
gen_args = ",".join(f"{k}={v}" for k, v in args.gen_kwargs.items())
else:
gen_args = args.gen_kwargs
cmd.extend(["--gen_kwargs", gen_args])
if args.batch_size:
cmd.extend(["--batch_size", str(args.batch_size)])
if args.output_path:
cmd.extend(["--output_path", args.output_path])
if args.write_out:
cmd.append("--write_out")
if args.num_fewshot is not None:
cmd.extend(["--num_fewshot", str(args.num_fewshot)])
if args.max_batch_size is not None:
cmd.extend(["--max_batch_size", str(args.max_batch_size)])
if args.device:
cmd.extend(["--device", args.device])
if args.limit is not None:
cmd.extend(["--limit", str(args.limit)])
if args.samples:
cmd.extend(["--samples", args.samples])
if args.use_cache:
cmd.extend(["--use_cache", args.use_cache])
if args.cache_requests:
cmd.extend(["--cache_requests", args.cache_requests])
if args.check_integrity:
cmd.append("--check_integrity")
if args.write_out:
cmd.append("--write_out")
if args.log_samples:
cmd.append("--log_samples")
if args.system_instruction:
cmd.extend(["--system_instruction", args.system_instruction])
if args.apply_chat_template:
if args.apply_chat_template is True:
cmd.append("--apply_chat_template")
else:
cmd.extend(["--apply_chat_template", args.apply_chat_template])
if args.fewshot_as_multiturn:
cmd.append("--fewshot_as_multiturn")
if args.show_config:
cmd.append("--show_config")
if args.include_path:
cmd.extend(["--include_path", args.include_path])
if args.verbosity:
cmd.extend(["--verbosity", args.verbosity])
if args.wandb_args:
cmd.extend(["--wandb_args", args.wandb_args])
if args.wandb_config_args:
cmd.extend(["--wandb_config_args", args.wandb_config_args])
if args.hf_hub_log_args:
cmd.extend(["--hf_hub_log_args", args.hf_hub_log_args])
if args.predict_only:
cmd.append("--predict_only")
if args.seed:
if isinstance(args.seed, list):
seed_arg = ",".join(str(x) for x in args.seed)
else:
seed_arg = str(args.seed)
cmd.extend(["--seed", seed_arg])
if args.trust_remote_code:
cmd.append("--trust_remote_code")
if args.confirm_run_unsafe_code:
cmd.append("--confirm_run_unsafe_code")
if args.metadata:
if isinstance(args.metadata, dict):
metadata_arg = json.dumps(args.metadata)
else:
metadata_arg = str(args.metadata)
cmd.extend(["--metadata", metadata_arg])
# 打印执行的命令
print("Executing command:", " ".join(cmd))
try:
subprocess.run(cmd, check=True)
except subprocess.CalledProcessError as e:
print(f"Error running lm-eval: {e}", file=sys.stderr)
sys.exit(e.returncode)
except FileNotFoundError:
print("Error: lm-eval not found. Please install lm-evaluation-harness first.", file=sys.stderr)
sys.exit(1)

View File

@@ -17,127 +17,11 @@
# This file is modified from https://github.com/vllm-project/vllm/blob/main/vllm/benchmarks/latency.py
import argparse
import dataclasses
import json
import time
import numpy as np
from tqdm import tqdm
import fastdeploy.envs as envs
from fastdeploy.engine.args_utils import EngineArgs
from fastdeploy.benchmarks.latency import add_cli_args, main
from fastdeploy.entrypoints.cli.benchmark.base import BenchmarkSubcommandBase
def add_cli_args(parser: argparse.ArgumentParser):
parser.add_argument("--input-len", type=int, default=32)
parser.add_argument("--output-len", type=int, default=128)
parser.add_argument("--batch-size", type=int, default=8)
parser.add_argument(
"--n",
type=int,
default=1,
help="Number of generated sequences per prompt.",
)
parser.add_argument("--use-beam-search", action="store_true")
parser.add_argument(
"--num-iters-warmup",
type=int,
default=10,
help="Number of iterations to run for warmup.",
)
parser.add_argument("--num-iters", type=int, default=30, help="Number of iterations to run.")
parser.add_argument(
"--profile",
action="store_true",
help="profile the generation process of a single batch",
)
parser.add_argument(
"--output-json",
type=str,
default=None,
help="Path to save the latency results in JSON format.",
)
parser.add_argument(
"--disable-detokenize",
action="store_true",
help=("Do not detokenize responses (i.e. do not include " "detokenization time in the latency measurement)"),
)
parser = EngineArgs.add_cli_args(parser)
# V1 enables prefix caching by default which skews the latency
# numbers. We need to disable prefix caching by default.
parser.set_defaults(enable_prefix_caching=False)
def main(args: argparse.Namespace):
if args.profile and not envs.VLLM_TORCH_PROFILER_DIR:
raise OSError(
"The environment variable 'VLLM_TORCH_PROFILER_DIR' is not set. "
"Please set it to a valid path to use torch profiler."
)
engine_args = EngineArgs.from_cli_args(args)
# Lazy import to avoid importing LLM when the bench command is not selected.
from fastdeploy import LLM, SamplingParams
# NOTE(woosuk): If the request cannot be processed in a single batch,
# the engine will automatically process the request in multiple batches.
llm = LLM(**dataclasses.asdict(engine_args))
assert llm.llm_engine.cfg.max_model_len >= (args.input_len + args.output_len), (
"Please ensure that max_model_len is greater than" " the sum of input_len and output_len."
)
sampling_params = SamplingParams(
n=args.n,
temperature=1.0,
top_p=1.0,
max_tokens=args.output_len,
)
dummy_prompt_token_ids = np.random.randint(10000, size=(args.batch_size, args.input_len))
dummy_prompts = [{"prompt_token_ids": batch} for batch in dummy_prompt_token_ids.tolist()]
def llm_generate():
llm.generate(dummy_prompts, sampling_params=sampling_params, use_tqdm=False, stream=True)
def run_to_completion():
start_time = time.perf_counter()
llm_generate()
end_time = time.perf_counter()
latency = end_time - start_time
return latency
print("Warming up...")
for _ in tqdm(range(args.num_iters_warmup), desc="Warmup iterations"):
run_to_completion()
if args.profile:
print("Profiling...")
run_to_completion()
return
# Benchmark.
latencies = []
for _ in tqdm(range(args.num_iters), desc="Profiling iterations"):
latencies.append(run_to_completion())
latencies = np.array(latencies)
percentages = [10, 25, 50, 75, 90, 99]
percentiles = np.percentile(latencies, percentages)
print(f"Avg latency: {np.mean(latencies)} seconds")
for percentage, percentile in zip(percentages, percentiles):
print(f"{percentage}% percentile latency: {percentile} seconds")
# Output JSON results if specified
if args.output_json:
results = {
"avg_latency": np.mean(latencies),
"latencies": latencies.tolist(),
"percentiles": dict(zip(percentages, percentiles.tolist())),
}
with open(args.output_json, "w") as f:
json.dump(results, f, indent=4)
class BenchmarkLatencySubcommand(BenchmarkSubcommandBase):
"""The `latency` subcommand for fastdeploy bench."""

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,36 @@
"""
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
# This file is modified from https://github.com/vllm-project/vllm/blob/main/vllm/entrypoints/cli/benchmark/throughput.py
import argparse
from fastdeploy.benchmarks.throughput import add_cli_args, main
from fastdeploy.entrypoints.cli.benchmark.base import BenchmarkSubcommandBase
class BenchmarkThroughputSubcommand(BenchmarkSubcommandBase):
"""The `throughput` subcommand for fastdeploy bench."""
name = "throughput"
help = "Benchmark the online serving throughput."
@classmethod
def add_cli_args(cls, parser: argparse.ArgumentParser) -> None:
add_cli_args(parser)
@staticmethod
def cmd(args: argparse.Namespace) -> None:
main(args)

View File

@@ -43,7 +43,7 @@ class CollectEnvSubcommand(CLISubcommand):
"collect-env",
help="Start collecting environment information.",
description="Start collecting environment information.",
usage="vllm collect-env",
usage="fastdeploy collect-env",
)

View File

@@ -249,7 +249,10 @@ setup(
],
license="Apache 2.0",
python_requires=">=3.7",
extras_require={"test": ["pytest>=6.0"]},
extras_require={
"test": ["pytest>=6.0"],
"eval": ["lm-eval==0.4.9.1"],
},
entry_points={
"console_scripts": ["fastdeploy=fastdeploy.entrypoints.cli.main:main"],
},

View File

@@ -0,0 +1,210 @@
"""
Test cases for endpoint_request_func.py
"""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from fastdeploy.benchmarks.lib.endpoint_request_func import (
ASYNC_REQUEST_FUNCS,
OPENAI_COMPATIBLE_BACKENDS,
RequestFuncInput,
RequestFuncOutput,
async_request_deepspeed_mii,
async_request_eb_openai_chat_completions,
async_request_eb_openai_completions,
async_request_openai_audio,
async_request_openai_completions,
async_request_tgi,
async_request_trt_llm,
)
@pytest.fixture
def mock_request_input():
return RequestFuncInput(
no=1,
prompt="test prompt",
history_QA=None,
hyper_parameters={},
api_url="http://test.com/completions",
prompt_len=10,
output_len=20,
model="test-model",
debug=True,
)
@pytest.mark.asyncio
async def test_async_request_eb_openai_chat_completions(mock_request_input):
"""Test async_request_eb_openai_chat_completions with mock response"""
# Create a mock response that will work with the async context manager
mock_response = MagicMock()
mock_response.status = 200
mock_response.__aenter__.return_value = mock_response
# Mock the streaming response
chunks = [
b'data: {"choices": [{"delta": {"content": "Hello"}}], "usage": {"prompt_tokens_details": {"cached_tokens": 5}}}\n\n',
b'data: {"choices": [{"delta": {"content": " World"}}]}\n\n',
b"data: [DONE]\n\n",
]
mock_response.content.__aiter__.return_value = chunks
with patch("aiohttp.ClientSession.post", return_value=mock_response):
output = await async_request_eb_openai_chat_completions(mock_request_input)
assert output.success is True
assert "Hello World" in output.generated_text
assert output.ttft > 0
@pytest.mark.asyncio
async def test_async_request_eb_openai_completions(mock_request_input):
"""Test async_request_eb_openai_completions with mock response"""
mock_response = MagicMock()
mock_response.status = 200
mock_response.reason = "OK"
mock_response.__aenter__.return_value = mock_response
chunks = [
b'data: {"choices": [{"text": "Test"}]}\n\n',
b'data: {"choices": [{"text": " response"}]}\n\n',
b"data: [DONE]\n\n",
]
mock_response.content.__aiter__.return_value = chunks
with patch("aiohttp.ClientSession.post", return_value=mock_response):
output = await async_request_eb_openai_completions(mock_request_input)
assert output.success is True
assert "Test response" in output.generated_text
@pytest.mark.asyncio
async def test_async_request_tgi(mock_request_input):
"""Test async_request_tgi with mock response"""
mock_request_input.api_url = "http://test.com/generate_stream"
mock_response = MagicMock()
mock_response.status = 200
mock_response.__aenter__.return_value = mock_response
chunks = [b'data: {"generated_text": "TGI response", "arrival_time": 1234567890}\n\n', b"data: [DONE]\n\n"]
mock_response.content.__aiter__.return_value = chunks
with patch("aiohttp.ClientSession.post", return_value=mock_response):
output = await async_request_tgi(mock_request_input)
assert output.success is False
@pytest.mark.asyncio
async def test_async_request_trt_llm(mock_request_input):
"""Test async_request_trt_llm with mock response"""
mock_request_input.api_url = "http://test.com/generate_stream"
mock_response = MagicMock()
mock_response.status = 200
mock_response.__aenter__.return_value = mock_response
chunks = [b'data: {"text_output": "TRT LLM response"}\n\n', b"data: [DONE]\n\n"]
mock_response.content.__aiter__.return_value = chunks
with patch("aiohttp.ClientSession.post", return_value=mock_response):
output = await async_request_trt_llm(mock_request_input)
assert output.success is False
@pytest.mark.asyncio
async def test_async_request_openai_completions(mock_request_input):
"""Test async_request_openai_completions with mock response"""
mock_request_input.api_url = "http://test.com/completions"
mock_response = MagicMock()
mock_response.status = 200
mock_response.__aenter__.return_value = mock_response
chunks = [
b'data: {"choices": [{"text": "OpenAI"}]}\n\n',
b'data: {"choices": [{"text": " Completions"}]}\n\n',
b'data: {"usage": {"completion_tokens": 2}}\n\n',
b"data: [DONE]\n\n",
]
mock_response.content.__aiter__.return_value = chunks
with patch("aiohttp.ClientSession.post", return_value=mock_response):
output = await async_request_openai_completions(mock_request_input)
assert output.success is True
assert "OpenAI Completions" in output.generated_text
assert output.output_tokens == 2
@pytest.mark.asyncio
async def test_async_request_deepspeed_mii(mock_request_input):
"""Test async_request_deepspeed_mii with mock response"""
mock_response = MagicMock()
mock_response.status = 200
mock_response.__aenter__.return_value = mock_response
mock_response.json = AsyncMock(return_value={"choices": [{"text": "DeepSpeed MII response"}]})
with patch("aiohttp.ClientSession.post", return_value=mock_response):
output = await async_request_deepspeed_mii(mock_request_input)
assert output.success is True
assert "DeepSpeed MII response" in output.generated_text
@pytest.mark.asyncio
async def test_async_request_openai_audio(mock_request_input):
"""Test async_request_openai_audio with mock response"""
pytest.skip("Skipping audio test due to soundfile dependency")
# 保留测试结构但不实际执行
mock_request_input.multi_modal_content = {"audio": (b"test", 16000)}
mock_request_input.api_url = "http://test.com/transcriptions"
mock_response = MagicMock()
mock_response.status = 200
mock_response.__aenter__.return_value = mock_response
chunks = [b'data: {"choices": [{"delta": {"content": "test"}}]}\n\n']
mock_response.content.__aiter__.return_value = chunks
with patch("aiohttp.ClientSession.post", return_value=mock_response):
output = await async_request_openai_audio(mock_request_input)
assert output.success is True
@pytest.mark.asyncio
async def test_async_request_functions_dict():
"""Test ASYNC_REQUEST_FUNCS contains all expected functions"""
assert len(ASYNC_REQUEST_FUNCS) >= 8
assert "tgi" in ASYNC_REQUEST_FUNCS
assert "openai-chat" in ASYNC_REQUEST_FUNCS
assert "openai" in ASYNC_REQUEST_FUNCS
assert "tensorrt-llm" in ASYNC_REQUEST_FUNCS
assert "deepspeed-mii" in ASYNC_REQUEST_FUNCS
assert "openai-audio" in ASYNC_REQUEST_FUNCS
@pytest.mark.asyncio
async def test_openai_compatible_backends():
"""Test OPENAI_COMPATIBLE_BACKENDS contains expected backends"""
assert len(OPENAI_COMPATIBLE_BACKENDS) >= 2
assert "openai-chat" in OPENAI_COMPATIBLE_BACKENDS
assert "vllm" in OPENAI_COMPATIBLE_BACKENDS
@pytest.mark.asyncio
async def test_request_func_output_defaults():
"""Test RequestFuncOutput default values"""
output = RequestFuncOutput()
assert output.no == 0
assert output.generated_text == ""
assert output.success is False
assert output.latency == 0.0

View File

@@ -0,0 +1,104 @@
import json
import os
import tempfile
import unittest
from unittest.mock import MagicMock, patch
from fastdeploy.benchmarks.lib import utils
class TestConvertToPytorchBenchmarkFormat(unittest.TestCase):
def test_empty_metrics(self):
args = MagicMock()
args.model = "test_model"
metrics = {}
extra_info = {}
result = utils.convert_to_pytorch_benchmark_format(args, metrics, extra_info)
self.assertEqual(result, [])
def test_with_metrics_no_save_env(self):
args = MagicMock()
args.model = "test_model"
args.tensor_parallel_size = 2
metrics = {"latency": [100, 200]}
extra_info = {"batch_size": 32}
with patch.dict(os.environ, {"SAVE_TO_PYTORCH_BENCHMARK_FORMAT": "False"}):
with patch.object(utils, "os") as mock_os:
mock_os.environ.get.return_value = False
result = utils.convert_to_pytorch_benchmark_format(args, metrics, extra_info)
self.assertEqual(result, [])
def test_with_metrics_and_save_env(self):
args = MagicMock()
args.model = "test_model"
args.tensor_parallel_size = 2
metrics = {"latency": [100, 200]}
extra_info = {"batch_size": 32}
with patch.dict(os.environ, {"SAVE_TO_PYTORCH_BENCHMARK_FORMAT": "True"}):
result = utils.convert_to_pytorch_benchmark_format(args, metrics, extra_info)
self.assertEqual(len(result), 1)
self.assertEqual(result[0]["model"]["name"], "test_model")
self.assertEqual(result[0]["metric"]["name"], "latency")
self.assertEqual(result[0]["metric"]["benchmark_values"], [100, 200])
class TestInfEncoder(unittest.TestCase):
def test_clear_inf_with_dict(self):
encoder = utils.InfEncoder()
data = {"a": float("inf"), "b": 1.0}
result = encoder.clear_inf(data)
self.assertEqual(result, {"a": "inf", "b": 1.0})
def test_clear_inf_with_list(self):
encoder = utils.InfEncoder()
data = [float("inf"), 1.0]
result = encoder.clear_inf(data)
self.assertEqual(result, ["inf", 1.0])
def test_clear_inf_with_other_types(self):
encoder = utils.InfEncoder()
self.assertEqual(encoder.clear_inf("test"), "test")
self.assertEqual(encoder.clear_inf(123), 123)
self.assertEqual(encoder.clear_inf(None), None)
class TestWriteToJson(unittest.TestCase):
def test_write_to_json(self):
test_data = [{"key": "value"}, {"key2": 123}]
with tempfile.NamedTemporaryFile(mode="w", delete=False) as temp_file:
temp_file_path = temp_file.name
try:
utils.write_to_json(temp_file_path, test_data)
with open(temp_file_path, "r") as f:
loaded_data = json.load(f)
self.assertEqual(loaded_data, test_data)
finally:
if os.path.exists(temp_file_path):
os.remove(temp_file_path)
def test_write_to_json_with_inf(self):
test_data = [{"key": float("inf")}]
with tempfile.NamedTemporaryFile(mode="w", delete=False) as temp_file:
temp_file_path = temp_file.name
try:
utils.write_to_json(temp_file_path, test_data)
with open(temp_file_path, "r") as f:
loaded_data = json.load(f)
self.assertEqual(loaded_data, [{"key": "inf"}])
finally:
if os.path.exists(temp_file_path):
os.remove(temp_file_path)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,151 @@
import io
import json
from argparse import ArgumentParser, Namespace
import pytest
from PIL import Image
import fastdeploy.benchmarks.datasets as bd
class DummyTokenizer:
vocab_size = 100
def num_special_tokens_to_add(self):
return 1
def decode(self, ids):
return "dummy_text"
def encode(self, text, add_special_tokens=False):
return list(range(len(text)))
def make_temp_json(tmp_path, content):
fpath = tmp_path / "data.json"
with open(fpath, "w", encoding="utf-8") as f:
for line in content:
f.write(json.dumps(line) + "\n")
return str(fpath)
def test_is_valid_sequence_variants():
assert bd.is_valid_sequence(10, 10)
assert not bd.is_valid_sequence(1, 10) # prompt too short
assert not bd.is_valid_sequence(10, 1) # output too short
assert not bd.is_valid_sequence(2000, 10, max_prompt_len=100)
assert not bd.is_valid_sequence(2000, 100, max_total_len=200)
# skip min output len
assert bd.is_valid_sequence(10, 1, skip_min_output_len_check=True)
def test_process_image_with_pil_and_str(tmp_path):
# dict input with raw bytes
img = Image.new("RGB", (10, 10), color="red")
buf = io.BytesIO()
img.save(buf, format="PNG")
raw_dict = {"bytes": buf.getvalue()}
out = bd.process_image(raw_dict)
assert "image_url" in out
# PIL image input
out2 = bd.process_image(img)
assert out2["type"] == "image_url"
assert out2["image_url"]["url"].startswith("data:image/jpeg;base64,")
# str input
out3 = bd.process_image("path/to/file")
assert out3["image_url"]["url"].startswith("file://")
out4 = bd.process_image("http://abc.com/img.png")
assert out4["image_url"]["url"].startswith("http://")
# invalid input
with pytest.raises(ValueError):
bd.process_image(123)
def test_maybe_oversample_requests(caplog):
dataset = bd.RandomDataset()
requests = [bd.SampleRequest(1, "a", [], None, 10, 20)]
dataset.maybe_oversample_requests(requests, 3)
assert len(requests) >= 3
def test_EBDataset_and_EBChatDataset(tmp_path):
eb_content = [
{
"text": "hello",
"temperature": 0.7,
"penalty_score": 1.0,
"frequency_score": 1.0,
"presence_score": 1.0,
"topp": 0.9,
"input_token_num": 5,
"max_dec_len": 10,
}
]
eb_file = make_temp_json(tmp_path, eb_content)
eb = bd.EBDataset(dataset_path=eb_file, shuffle=True)
samples = eb.sample(2)
assert all(isinstance(s, bd.SampleRequest) for s in samples)
assert all(s.json_data is not None for s in samples)
chat_content = [{"messages": [{"role": "user", "content": "hi"}], "max_tokens": 20}]
chat_file = make_temp_json(tmp_path, chat_content)
chat = bd.EBChatDataset(dataset_path=chat_file, shuffle=True)
samples2 = chat.sample(2, enable_multimodal_chat=False)
assert all(isinstance(s, bd.SampleRequest) for s in samples2)
assert all(s.json_data is not None for s in samples2)
def test_RandomDataset_sample():
tok = DummyTokenizer()
dataset = bd.RandomDataset(random_seed=123)
samples = dataset.sample(tok, 2, prefix_len=2, range_ratio=0.1)
assert len(samples) == 2
assert all(isinstance(s, bd.SampleRequest) for s in samples)
# range_ratio >= 1 should raise
with pytest.raises(AssertionError):
dataset.sample(tok, 1, range_ratio=1.0)
def test__ValidateDatasetArgs_and_get_samples(tmp_path):
parser = ArgumentParser()
parser.add_argument("--dataset-name", default="random")
parser.add_argument("--dataset-path", action=bd._ValidateDatasetArgs)
# invalid: random + dataset-path
with pytest.raises(SystemExit):
parser.parse_args(["--dataset-path", "abc.json"])
# test get_samples with EBChat
chat_content = [
{
"messages": [
{"role": "user", "content": "hello"},
{"role": "assistant", "content": "hi there"},
{"role": "user", "content": "how are you?"},
],
"max_tokens": 10,
}
]
chat_file = make_temp_json(tmp_path, chat_content)
args = Namespace(
dataset_name="EBChat", dataset_path=chat_file, seed=0, shuffle=False, num_prompts=1, sharegpt_output_len=10
)
out = bd.get_samples(args)
assert isinstance(out, list)
# unknown dataset
args.dataset_name = "unknown"
with pytest.raises(ValueError):
bd.get_samples(args)
def test_add_dataset_parser():
parser = bd.FlexibleArgumentParser()
bd.add_dataset_parser(parser)
args = parser.parse_args([])
assert hasattr(args, "seed")
assert hasattr(args, "num_prompts")

View File

@@ -0,0 +1,102 @@
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import unittest
from unittest.mock import MagicMock, patch
import numpy as np
from fastdeploy.benchmarks.latency import add_cli_args, main
class TestLatency(unittest.TestCase):
def test_add_cli_args(self):
parser = argparse.ArgumentParser()
add_cli_args(parser)
args = parser.parse_args([])
self.assertEqual(args.input_len, 32)
self.assertEqual(args.output_len, 128)
self.assertEqual(args.batch_size, 8)
@patch("fastdeploy.LLM")
@patch("numpy.random.randint")
@patch("tqdm.tqdm")
def test_main(self, mock_tqdm, mock_randint, mock_llm):
# Setup mocks
mock_llm_instance = MagicMock()
mock_llm.return_value = mock_llm_instance
mock_cfg = MagicMock()
mock_cfg.max_model_len = 2048
mock_llm_instance.llm_engine.cfg = mock_cfg
mock_randint.return_value = np.zeros((8, 32))
mock_tqdm.return_value = range(10)
# Build args using parser
parser = argparse.ArgumentParser()
add_cli_args(parser)
args = parser.parse_args([])
# Set required args
args.input_len = 32
args.output_len = 128
args.batch_size = 8
args.n = 1
args.num_iters_warmup = 2
args.num_iters = 3
args.model = "test_model"
args.served_model_name = "test_model"
args.tokenizer = "test_tokenizer"
# Run test
main(args)
# Verify calls
mock_llm.assert_called_once()
mock_llm_instance.generate.assert_called()
@patch("fastdeploy.LLM")
@patch("sys.exit")
def test_main_profile_error(self, mock_exit, mock_llm):
# Setup mocks
mock_llm_instance = MagicMock()
mock_llm.return_value = mock_llm_instance
mock_cfg = MagicMock()
mock_cfg.max_model_len = 2048
mock_llm_instance.llm_engine.cfg = mock_cfg
# Build args using parser
parser = argparse.ArgumentParser()
add_cli_args(parser)
args = parser.parse_args([])
# Set required args
args.input_len = 32
args.output_len = 128
args.batch_size = 8
args.n = 1
args.num_iters_warmup = 2
args.num_iters = 3
args.profile = False
args.model = "test_model"
args.served_model_name = "test_model"
args.tokenizer = "test_tokenizer"
main(args)
mock_exit.assert_not_called() # Since profile=False, exit should not be called
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,397 @@
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import unittest
from unittest import IsolatedAsyncioTestCase
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from fastdeploy.benchmarks.serve import (
BenchmarkMetrics,
add_cli_args,
benchmark,
calculate_metrics,
check_goodput_args,
convert_to_pytorch_benchmark_format,
get_request,
save_to_pytorch_benchmark_format,
write_to_json,
)
class TestServe(IsolatedAsyncioTestCase):
def test_add_cli_args(self):
parser = argparse.ArgumentParser()
add_cli_args(parser)
args = parser.parse_args(["--model", "test_model"])
self.assertEqual(args.backend, "openai-chat")
self.assertEqual(args.host, "127.0.0.1")
self.assertEqual(args.port, 8000)
self.assertEqual(args.model, "test_model")
def test_benchmark_metrics_init(self):
metrics = BenchmarkMetrics(
completed=10,
total_input=100,
total_output=200,
request_throughput=5.0,
request_goodput=4.0,
output_throughput=10.0,
total_token_throughput=15.0,
mean_s_decode=0.5,
median_s_decode=0.5,
std_s_decode=0.1,
percentiles_s_decode=[(99, 0.6)],
mean_ttft_ms=100.0,
median_ttft_ms=100.0,
std_ttft_ms=10.0,
percentiles_ttft_ms=[(99, 110.0)],
mean_s_ttft_ms=90.0,
median_s_ttft_ms=90.0,
std_s_ttft_ms=9.0,
percentiles_s_ttft_ms=[(99, 100.0)],
mean_tpot_ms=50.0,
median_tpot_ms=50.0,
std_tpot_ms=5.0,
percentiles_tpot_ms=[(99, 60.0)],
mean_itl_ms=20.0,
median_itl_ms=20.0,
std_itl_ms=2.0,
percentiles_itl_ms=[(99, 25.0)],
mean_s_itl_ms=18.0,
median_s_itl_ms=18.0,
std_s_itl_ms=1.8,
percentiles_s_itl_ms=[(99, 20.0)],
mean_e2el_ms=500.0,
median_e2el_ms=500.0,
std_e2el_ms=50.0,
percentiles_e2el_ms=[(99, 600.0)],
mean_s_e2el_ms=450.0,
median_s_e2el_ms=450.0,
std_s_e2el_ms=45.0,
percentiles_s_e2el_ms=[(99, 500.0)],
mean_input_len=10.0,
median_input_len=10.0,
std_input_len=1.0,
percentiles_input_len=[(99, 12.0)],
mean_s_input_len=9.0,
median_s_input_len=9.0,
std_s_input_len=0.9,
percentiles_s_input_len=[(99, 10.0)],
mean_output_len=20.0,
median_output_len=20.0,
std_output_len=2.0,
percentiles_output_len=[(99, 25.0)],
)
self.assertEqual(metrics.completed, 10)
self.assertEqual(metrics.total_input, 100)
self.assertEqual(metrics.total_output, 200)
def test_calculate_metrics(self):
from fastdeploy.benchmarks.datasets import SampleRequest
from fastdeploy.benchmarks.lib.endpoint_request_func import RequestFuncOutput
input_requests = [
SampleRequest(no=1, prompt="test1", prompt_len=10, expected_output_len=20, history_QA=[], json_data=None)
]
outputs = [
RequestFuncOutput(
success=True,
prompt_len=10,
prompt_tokens=10,
output_tokens=20,
ttft=0.1,
itl=[0.02, 0.02, 0.02],
latency=0.5,
arrival_time=[0, 0.1, 0.12, 0.14, 0.16],
generated_text="test output",
reasoning_content=None,
error=None,
)
]
metrics, _ = calculate_metrics(
input_requests=input_requests,
outputs=outputs,
dur_s=1.0,
selected_percentiles=[99],
goodput_config_dict={},
)
self.assertEqual(metrics.completed, 1)
self.assertEqual(metrics.total_input, 10)
self.assertEqual(metrics.total_output, 20)
@pytest.mark.asyncio
@patch("fastdeploy.benchmarks.serve.get_request")
@patch("asyncio.gather", new_callable=AsyncMock)
async def test_benchmark(self, mock_gather, mock_get_request):
# 直接在测试中设置ASYNC_REQUEST_FUNCS
from fastdeploy.benchmarks.serve import ASYNC_REQUEST_FUNCS
mock_func = AsyncMock()
ASYNC_REQUEST_FUNCS["test_backend"] = mock_func
from fastdeploy.benchmarks.datasets import SampleRequest
# 创建一个异步生成器函数来模拟get_request
async def mock_request_gen():
yield SampleRequest(
no=1, prompt="test", prompt_len=10, expected_output_len=20, history_QA=[], json_data=None
)
mock_get_request.return_value = mock_request_gen()
mock_func.return_value = MagicMock(
success=True,
prompt_len=10,
prompt_tokens=10,
output_tokens=20,
ttft=0.1,
itl=[0.02, 0.02, 0.02],
latency=0.5,
arrival_time=[0, 0.1, 0.12, 0.14, 0.16],
generated_text="test output",
reasoning_content=None,
error=None,
)
result = await benchmark(
backend="test_backend",
api_url="http://test",
base_url="http://test",
model_id="test_model",
model_name="test_model",
input_requests=[
SampleRequest(
no=1, prompt="test", prompt_len=10, expected_output_len=20, history_QA=[], json_data=None
)
],
hyper_parameters={},
logprobs=None,
request_rate=1.0,
burstiness=1.0,
disable_tqdm=True,
profile=False,
selected_percentile_metrics=["ttft", "tpot", "itl"],
selected_percentiles=[99],
ignore_eos=False,
debug=False,
goodput_config_dict={},
max_concurrency=None,
lora_modules=None,
extra_body=None,
)
self.assertEqual(result["total_input_tokens"], 0)
@pytest.mark.asyncio
@patch("asyncio.sleep", new_callable=AsyncMock)
async def test_get_request(self, mock_sleep):
from fastdeploy.benchmarks.datasets import SampleRequest
input_requests = [
SampleRequest(no=1, prompt="test1", prompt_len=10, expected_output_len=20, history_QA=[], json_data=None),
SampleRequest(no=2, prompt="test2", prompt_len=15, expected_output_len=25, history_QA=[], json_data=None),
]
# Test infinite request rate
count = 0
async for _ in get_request(input_requests, float("inf")):
count += 1
if count >= 2:
break
self.assertEqual(count, 2)
# Test finite request rate
mock_sleep.return_value = None
count = 0
async for _ in get_request(input_requests, 1.0, 1.0):
count += 1
if count >= 2:
break
self.assertEqual(count, 2)
mock_sleep.assert_called()
def test_check_goodput_args(self):
# Test valid goodput args
class Args:
goodput = ["ttft:100", "tpot:50"]
goodput_config = check_goodput_args(Args())
self.assertEqual(goodput_config["ttft"], 100)
self.assertEqual(goodput_config["tpot"], 50)
# Test invalid goodput args
class InvalidArgs:
goodput = ["invalid:100"]
with self.assertRaises(ValueError):
check_goodput_args(InvalidArgs())
@patch("os.environ.get", return_value="1")
def test_convert_to_pytorch_benchmark_format(self, mock_env):
class Args:
model = "test_model"
metrics = {"mean_ttft_ms": [100.0], "median_ttft_ms": [100.0]}
extra_info = {"tensor_parallel_size": 1}
records = convert_to_pytorch_benchmark_format(Args(), metrics, extra_info)
self.assertEqual(len(records), 2)
self.assertEqual(records[0]["model"]["name"], "test_model")
@patch("builtins.open", new_callable=MagicMock)
@patch("json.dump")
def test_write_to_json(self, mock_dump, mock_open):
records = [{"test": "data"}]
write_to_json("test.json", records)
mock_dump.assert_called_once()
@patch("os.environ.get", return_value="1")
@patch("builtins.open", new_callable=MagicMock)
@patch("json.dump")
def test_save_to_pytorch_benchmark_format(self, mock_dump, mock_open, mock_env):
class Args:
model = "test_model"
results = {
"mean_ttft_ms": 100.0,
"median_ttft_ms": 100.0,
"std_ttft_ms": 10.0,
"p99_ttft_ms": 110.0,
"mean_tpot_ms": 50.0,
"median_tpot_ms": 50.0,
"std_tpot_ms": 5.0,
"p99_tpot_ms": 60.0,
"median_itl_ms": 20.0,
"mean_itl_ms": 20.0,
"std_itl_ms": 2.0,
"p99_itl_ms": 25.0,
}
save_to_pytorch_benchmark_format(Args(), results, "test.json")
mock_dump.assert_called_once()
@pytest.mark.asyncio
@patch("builtins.open", new_callable=MagicMock)
@patch("yaml.safe_load")
@patch("fastdeploy.benchmarks.serve.benchmark", new_callable=AsyncMock)
@patch("fastdeploy.benchmarks.serve.get_samples", new_callable=MagicMock)
@patch("fastdeploy.benchmarks.serve.add_cli_args")
@patch("argparse.ArgumentParser.parse_args")
async def test_main_async(
self, mock_parse_args, mock_add_cli_args, mock_get_samples, mock_benchmark, mock_safe_load, mock_open
):
"""Test main_async function with successful execution"""
from fastdeploy.benchmarks.datasets import SampleRequest
from fastdeploy.benchmarks.serve import main_async
# Setup mock args
mock_args = MagicMock()
mock_args.backend = "openai-chat" # Use openai-compatible backend
mock_args.model = "test_model"
mock_args.request_rate = float("inf")
mock_args.burstiness = 1.0
mock_args.disable_tqdm = True
mock_args.profile = False
mock_args.ignore_eos = False
mock_args.debug = False
mock_args.max_concurrency = None
mock_args.lora_modules = None
mock_args.extra_body = None
mock_args.percentile_metrics = "ttft,tpot,itl"
mock_args.metric_percentiles = "99"
mock_args.goodput = None
mock_args.ramp_up_strategy = "1"
mock_args.ramp_up_start_rps = 1
mock_args.ramp_up_end_rps = 1
mock_args.dataset_name = "EB"
mock_args.dataset_path = MagicMock()
mock_args.dataset_split = None
mock_args.dataset_sample_ratio = 1.0
mock_args.dataset_shard_size = None
mock_args.dataset_shard_rank = None
mock_args.dataset_shuffle_seed = None
mock_args.top_p = 0.9 # Add sampling parameters for openai-compatible backend
mock_args.top_k = 50
mock_args.temperature = 0.7
mock_args.result_dir = MagicMock() # Mock result_dir
mock_args.result_filename = MagicMock() # Mock result_filename
mock_args.save_result = True # Enable file saving for test
mock_args.save_detailed = False
mock_args.append_result = False
mock_args.hyperparameter_path = "test_params.yaml"
mock_parse_args.return_value = mock_args
# Mock YAML loading
mock_safe_load.return_value = {"param1": "value1", "param2": 42}
# Mock file operations
mock_file = MagicMock()
mock_file.tell.return_value = 100 # Simulate non-empty file for append test
mock_open.return_value.__enter__.return_value = mock_file
# Mock get_samples return value
mock_get_samples.return_value = [
SampleRequest(no=1, prompt="test", prompt_len=10, expected_output_len=20, history_QA=[], json_data=None)
]
# Mock benchmark return value with complete JSON-serializable data
mock_benchmark.return_value = {
"completed": 1,
"total_input_tokens": 10,
"total_output_tokens": 20,
"request_throughput": 1.0,
"mean_ttft_ms": 100.0,
"median_ttft_ms": 100.0,
"std_ttft_ms": 10.0,
"p99_ttft_ms": 110.0,
"mean_tpot_ms": 50.0,
"median_tpot_ms": 50.0,
"std_tpot_ms": 5.0,
"p99_tpot_ms": 60.0,
"median_itl_ms": 20.0,
"mean_itl_ms": 20.0,
"std_itl_ms": 2.0,
"p99_itl_ms": 25.0,
"hyper_parameters": {"param1": "value1", "param2": 42},
"input_requests": [
{
"no": 1,
"prompt": "test",
"prompt_len": 10,
"expected_output_len": 20,
"history_QA": [],
"json_data": None,
}
],
}
# Mock json.dump to verify serialization
with patch("json.dump") as mock_json_dump:
# Call main_async with args
await main_async(mock_args)
# Verify mocks were called
mock_get_samples.assert_called_once()
# Verify YAML file was loaded
mock_open.assert_any_call("test_params.yaml", "r")
mock_safe_load.assert_called_once()
# Verify json.dump was called with serializable data
mock_json_dump.assert_called_once()
args, _ = mock_json_dump.call_args
self.assertIsInstance(args[0], dict) # Verify data is dict (JSON-serializable)
self.assertIn("completed", args[0]) # Verify benchmark results are included
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,485 @@
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import unittest
from unittest.mock import MagicMock, patch
try:
import torch
TORCH_AVAILABLE = True
except (ImportError, NameError, AttributeError, OSError):
TORCH_AVAILABLE = False
from fastdeploy.benchmarks.datasets import SampleRequest
from fastdeploy.benchmarks.throughput import (
EngineArgs,
add_cli_args,
get_requests,
main,
run_fd,
run_fd_chat,
run_hf,
validate_args,
)
class TestThroughput(unittest.TestCase):
@patch("fastdeploy.LLM")
def test_run_fd(self, mock_llm):
mock_llm_instance = MagicMock()
mock_llm.return_value = mock_llm_instance
mock_llm_instance.generate.return_value = ["output1", "output2"]
# Mock cfg.max_model_len
mock_cfg = MagicMock()
mock_cfg.max_model_len = 2048
mock_llm_instance.llm_engine.cfg = mock_cfg
requests = [
SampleRequest(
no=1, prompt="test prompt", prompt_len=10, expected_output_len=20, history_QA=[], json_data=None
)
]
engine_args = EngineArgs(model="test_model")
elapsed_time, outputs = run_fd(requests, n=1, engine_args=engine_args)
self.assertIsInstance(elapsed_time, float)
self.assertEqual(len(outputs), 2)
@patch("fastdeploy.LLM")
def test_run_fd_chat(self, mock_llm):
mock_llm_instance = MagicMock()
mock_llm.return_value = mock_llm_instance
mock_llm_instance.chat.return_value = ["chat output1", "chat output2"]
# Mock cfg.max_model_len
mock_cfg = MagicMock()
mock_cfg.max_model_len = 2048
mock_llm_instance.llm_engine.cfg = mock_cfg
requests = [
SampleRequest(
no=1, prompt="test chat prompt", prompt_len=10, expected_output_len=20, history_QA=[], json_data=None
)
]
engine_args = EngineArgs(model="test_model")
elapsed_time, outputs = run_fd_chat(requests, n=1, engine_args=engine_args)
self.assertIsInstance(elapsed_time, float)
self.assertEqual(len(outputs), 2)
@unittest.skipIf(not TORCH_AVAILABLE, "PyTorch is not available")
@patch("transformers.AutoModelForCausalLM.from_pretrained")
@patch("transformers.AutoTokenizer.from_pretrained")
def test_run_hf(self, mock_tokenizer, mock_model):
mock_model_instance = MagicMock()
mock_model.return_value = mock_model_instance
mock_model_instance.generate.return_value = torch.tensor([[1, 2, 3]]) if TORCH_AVAILABLE else None
mock_tokenizer_instance = MagicMock()
mock_tokenizer.return_value = mock_tokenizer_instance
mock_tokenizer_instance.pad_token = "pad"
requests = [
SampleRequest(
no=1, prompt="test hf prompt", prompt_len=10, expected_output_len=20, history_QA=[], json_data=None
)
]
elapsed_time = run_hf(
requests,
model="test_model",
tokenizer=mock_tokenizer_instance,
n=1,
max_batch_size=4,
trust_remote_code=True,
)
self.assertIsInstance(elapsed_time, float)
@patch("fastdeploy.benchmarks.datasets.RandomDataset")
def test_get_requests(self, mock_dataset):
mock_dataset_instance = MagicMock()
mock_dataset.return_value = mock_dataset_instance
mock_dataset_instance.sample.return_value = [
SampleRequest(no=1, prompt="test1", prompt_len=10, expected_output_len=20, history_QA=[], json_data=None),
SampleRequest(no=2, prompt="test2", prompt_len=15, expected_output_len=25, history_QA=[], json_data=None),
]
args = argparse.Namespace(
dataset_name="random",
dataset_path=None,
seed=42,
input_len=10,
output_len=20,
num_prompts=2,
hf_max_batch_size=4,
lora_path=None,
random_range_ratio=0.0,
prefix_len=0,
)
tokenizer = MagicMock()
tokenizer.vocab_size = 10000 # 设置合理的词汇表大小
tokenizer.num_special_tokens_to_add.return_value = 0 # 设置特殊token数量
requests = get_requests(args, tokenizer)
self.assertEqual(len(requests), 2)
def test_validate_args(self):
# Test basic validation
args = argparse.Namespace(
backend="fastdeploy",
dataset_name="random",
dataset=None,
dataset_path=None,
input_len=10,
output_len=20,
tokenizer=None,
model="test_model",
hf_max_batch_size=None,
trust_remote_code=False,
quantization=None,
)
validate_args(args)
self.assertEqual(args.tokenizer, "test_model")
def test_add_cli_args(self):
parser = argparse.ArgumentParser()
add_cli_args(parser)
args = parser.parse_args([])
self.assertEqual(args.backend, "fastdeploy")
self.assertEqual(args.dataset_name, "random")
@patch("fastdeploy.benchmarks.throughput.run_fd")
@patch("fastdeploy.benchmarks.throughput.get_requests")
@patch("transformers.AutoTokenizer.from_pretrained")
def test_main_fastdeploy(self, mock_tokenizer, mock_get_requests, mock_run_fd):
mock_get_requests.return_value = [
SampleRequest(no=1, prompt="test", prompt_len=10, expected_output_len=20, history_QA=[], json_data=None)
]
mock_run_fd.return_value = (1.0, ["output1", "output2"])
parser = argparse.ArgumentParser()
add_cli_args(parser)
args = parser.parse_args([])
args.backend = "fastdeploy"
args.dataset_name = "random"
args.dataset_path = None
args.seed = 42
args.input_len = 10
args.output_len = 20
args.num_prompts = 1
args.tokenizer = "test_tokenizer"
args.model = "test_model"
args.n = 1
args.hf_max_batch_size = None
args.trust_remote_code = False
args.output_json = None
args.disable_detokenize = False
args.tensor_parallel_size = 1
with patch("builtins.print") as mock_print:
main(args)
mock_print.assert_called()
@unittest.skipIf(not TORCH_AVAILABLE, "PyTorch is not available")
@patch("fastdeploy.benchmarks.throughput.run_hf")
@patch("fastdeploy.benchmarks.throughput.get_requests")
@patch("transformers.AutoTokenizer.from_pretrained")
@patch("transformers.AutoModelForCausalLM.from_pretrained")
def test_main_hf(self, mock_model, mock_tokenizer, mock_get_requests, mock_run_hf):
mock_get_requests.return_value = [
SampleRequest(no=1, prompt="test", prompt_len=10, expected_output_len=20, history_QA=[], json_data=None)
]
mock_run_hf.return_value = 1.0
parser = argparse.ArgumentParser()
add_cli_args(parser)
args = parser.parse_args([])
args.backend = "hf"
args.dataset_name = "random"
args.dataset_path = None
args.seed = 42
args.input_len = 10
args.output_len = 20
args.num_prompts = 1
args.tokenizer = "test_tokenizer"
args.model = "test_model"
args.n = 1
args.hf_max_batch_size = 4
args.trust_remote_code = True
args.output_json = None
args.disable_detokenize = False
args.tensor_parallel_size = 1
with patch("builtins.print") as mock_print:
main(args)
mock_print.assert_called()
@patch("fastdeploy.benchmarks.throughput.run_fd_chat")
@patch("fastdeploy.benchmarks.throughput.get_requests")
@patch("transformers.AutoTokenizer.from_pretrained")
def test_main_fastdeploy_chat(self, mock_tokenizer, mock_get_requests, mock_run_fd_chat):
mock_get_requests.return_value = [
SampleRequest(no=1, prompt="test", prompt_len=10, expected_output_len=20, history_QA=[], json_data=None)
]
mock_run_fd_chat.return_value = (1.0, ["output1", "output2"])
parser = argparse.ArgumentParser()
add_cli_args(parser)
args = parser.parse_args([])
args.backend = "fastdeploy-chat"
args.dataset_name = "random"
args.dataset_path = None
args.seed = 42
args.input_len = 10
args.output_len = 20
args.num_prompts = 1
args.tokenizer = "test_tokenizer"
args.model = "test_model"
args.n = 1
args.hf_max_batch_size = None
args.trust_remote_code = False
args.output_json = None
args.disable_detokenize = False
args.tensor_parallel_size = 1
with patch("builtins.print") as mock_print:
main(args)
mock_print.assert_called()
@patch("builtins.open")
@patch("json.dump")
@patch("fastdeploy.benchmarks.throughput.run_fd")
@patch("fastdeploy.benchmarks.throughput.get_requests")
def test_main_with_output_json(self, mock_get_requests, mock_run_fd, mock_json_dump, mock_open):
mock_get_requests.return_value = [
SampleRequest(no=1, prompt="test", prompt_len=10, expected_output_len=20, history_QA=[], json_data=None)
]
mock_run_fd.return_value = (1.0, ["output1", "output2"])
parser = argparse.ArgumentParser()
add_cli_args(parser)
args = parser.parse_args([])
args.backend = "fastdeploy"
args.dataset_name = "random"
args.dataset_path = None
args.seed = 42
args.input_len = 10
args.output_len = 20
args.num_prompts = 1
args.tokenizer = "test_tokenizer"
args.model = "test_model"
args.n = 1
args.hf_max_batch_size = None
args.trust_remote_code = False
args.output_json = "output.json"
args.disable_detokenize = False
args.tensor_parallel_size = 1
main(args)
mock_json_dump.assert_called()
# 新增测试用例覆盖缺失的行
def test_validate_args_with_lora(self):
parser = argparse.ArgumentParser()
add_cli_args(parser)
args = parser.parse_args([])
args.backend = "fastdeploy" # LoRA只支持vLLM后端
args.dataset_name = "random"
args.enable_lora = True
args.lora_path = "/path/to/lora"
args.input_len = 10
args.output_len = 20
args.tokenizer = "test_tokenizer"
args.model = "test_model"
validate_args(args)
def test_validate_args_with_hf_backend(self):
parser = argparse.ArgumentParser()
add_cli_args(parser)
args = parser.parse_args([])
args.backend = "hf"
args.dataset_name = "random"
args.hf_max_batch_size = 4
args.input_len = 10
args.output_len = 20
args.tokenizer = "test_tokenizer"
args.model = "test_model"
validate_args(args)
def test_validate_args_with_quantization(self):
parser = argparse.ArgumentParser()
add_cli_args(parser)
args = parser.parse_args([])
args.backend = "fastdeploy"
args.dataset_name = "random"
args.quantization = "w4a8"
args.input_len = 10
args.output_len = 20
args.tokenizer = "test_tokenizer"
args.model = "test_model"
validate_args(args)
@patch("fastdeploy.benchmarks.throughput.write_to_json")
@patch("fastdeploy.benchmarks.throughput.convert_to_pytorch_benchmark_format")
def test_save_to_pytorch_benchmark_format(self, mock_convert, mock_write):
args = argparse.Namespace(
output_json="test.json",
model="test_model",
input_len=10,
output_len=20,
backend="fastdeploy",
)
results = {
"elapsed_time": 1.0,
"num_requests": 10,
"total_num_tokens": 100,
"requests_per_second": 10.0,
"tokens_per_second": 100.0,
}
mock_convert.return_value = [{"metrics": {"requests_per_second": 10.0}}]
from fastdeploy.benchmarks.throughput import save_to_pytorch_benchmark_format
save_to_pytorch_benchmark_format(args, results)
mock_write.assert_called()
@patch("fastdeploy.benchmarks.throughput.run_fd")
@patch("fastdeploy.benchmarks.throughput.get_requests")
def test_main_with_disable_detokenize(self, mock_get_requests, mock_run_fd):
mock_get_requests.return_value = [
SampleRequest(no=1, prompt="test", prompt_len=10, expected_output_len=20, history_QA=[], json_data=None)
]
mock_run_fd.return_value = (1.0, ["output1", "output2"])
parser = argparse.ArgumentParser()
add_cli_args(parser)
args = parser.parse_args([])
args.backend = "fastdeploy"
args.dataset_name = "random"
args.dataset_path = None
args.seed = 42
args.input_len = 10
args.output_len = 20
args.num_prompts = 1
args.tokenizer = "test_tokenizer"
args.model = "test_model"
args.n = 1
args.hf_max_batch_size = None
args.trust_remote_code = False
args.output_json = None
args.disable_detokenize = True
args.tensor_parallel_size = 1
with patch("builtins.print") as mock_print:
main(args)
mock_print.assert_called()
def test_validate_args_with_random_range_ratio(self):
parser = argparse.ArgumentParser()
add_cli_args(parser)
args = parser.parse_args([])
args.backend = "fastdeploy"
args.dataset_name = "random"
args.random_range_ratio = 0.5
args.input_len = 10
args.output_len = 20
args.tokenizer = "test_tokenizer"
args.model = "test_model"
validate_args(args)
def test_validate_args_with_prefix_len(self):
parser = argparse.ArgumentParser()
add_cli_args(parser)
args = parser.parse_args([])
args.backend = "fastdeploy"
args.dataset_name = "random"
args.prefix_len = 5
args.input_len = 10
args.output_len = 20
args.tokenizer = "test_tokenizer"
args.model = "test_model"
validate_args(args)
def test_validate_args_with_eb_dataset(self):
parser = argparse.ArgumentParser()
add_cli_args(parser)
args = parser.parse_args([])
args.backend = "fastdeploy"
args.dataset_name = "EB"
args.dataset_path = "/path/to/eb"
args.tokenizer = "test_tokenizer"
args.model = "test_model"
validate_args(args)
def test_validate_args_with_ebchat_dataset(self):
parser = argparse.ArgumentParser()
add_cli_args(parser)
args = parser.parse_args([])
args.backend = "fastdeploy-chat"
args.dataset_name = "EBChat"
args.dataset_path = "/path/to/ebchat"
args.tokenizer = "test_tokenizer"
args.model = "test_model"
validate_args(args)
def test_add_cli_args_with_all_options(self):
parser = argparse.ArgumentParser()
add_cli_args(parser)
# 使用parse_known_args避免未识别参数导致的SystemExit
args, _ = parser.parse_known_args(
[
"--backend",
"fastdeploy-chat",
"--dataset-name",
"EBChat",
"--dataset-path",
"/path/to/dataset",
"--input-len",
"10",
"--output-len",
"20",
"--n",
"2",
"--num-prompts",
"50",
"--hf-max-batch-size",
"4",
"--output-json",
"output.json",
"--disable-detokenize",
"--lora-path",
"/path/to/lora",
"--prefix-len",
"5",
"--random-range-ratio",
"0.5",
]
)
self.assertEqual(args.backend, "fastdeploy-chat")
self.assertEqual(args.dataset_name, "EBChat")
self.assertEqual(args.dataset_path, "/path/to/dataset")
self.assertEqual(args.input_len, 10)
self.assertEqual(args.output_len, 20)
self.assertEqual(args.n, 2)
self.assertEqual(args.num_prompts, 50)
self.assertEqual(args.hf_max_batch_size, 4)
self.assertEqual(args.output_json, "output.json")
self.assertTrue(args.disable_detokenize)
self.assertEqual(args.lora_path, "/path/to/lora")
self.assertEqual(args.prefix_len, 5)
self.assertEqual(args.random_range_ratio, 0.5)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,275 @@
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import unittest
from unittest.mock import MagicMock, patch
import pkg_resources
from fastdeploy.entrypoints.cli.benchmark.eval import (
BenchmarkEvalSubcommand,
_int_or_none_list_arg_type,
try_parse_json,
)
class TestIntOrNoneListArgType(unittest.TestCase):
def test_single_value(self):
result = _int_or_none_list_arg_type(3, 4, "1,2,3,4", "5")
self.assertEqual(result, [5, 5, 5, 5])
def test_multiple_values(self):
result = _int_or_none_list_arg_type(3, 4, "1,2,3,4", "5,6,7,8")
self.assertEqual(result, [5, 6, 7, 8])
def test_none_value(self):
result = _int_or_none_list_arg_type(3, 4, "1,2,3,4", "None,6,None,8")
self.assertEqual(result, [None, 6, None, 8])
def test_partial_values(self):
result = _int_or_none_list_arg_type(3, 4, "1,2,3,4", "5,6,7")
self.assertEqual(result, [5, 6, 7, 4])
def test_invalid_input(self):
with self.assertRaises(argparse.ArgumentTypeError):
_int_or_none_list_arg_type(3, 4, "1,2,3,4", "5,6,7,8,9")
class TestTryParseJson(unittest.TestCase):
def test_valid_json(self):
result = try_parse_json('{"key": "value"}')
self.assertEqual(result, {"key": "value"})
def test_invalid_json(self):
result = try_parse_json("not a json")
self.assertEqual(result, "not a json")
def test_none_input(self):
result = try_parse_json(None)
self.assertIsNone(result)
def test_invalid_json_with_braces(self):
with self.assertRaises(argparse.ArgumentTypeError):
try_parse_json("{invalid: json}")
class TestBenchmarkEvalSubcommand(unittest.TestCase):
def setUp(self):
self.parser = argparse.ArgumentParser()
BenchmarkEvalSubcommand.add_cli_args(self.parser)
self.mock_pkg_resources = MagicMock()
def test_add_cli_args(self):
args = self.parser.parse_args(["--model", "test_model"])
self.assertEqual(args.model, "test_model")
@patch("subprocess.run")
@patch("pkg_resources.get_distribution")
def test_cmd_basic(self, mock_get_dist, mock_run):
mock_get_dist.return_value.version = "0.4.9.1"
mock_run.return_value = MagicMock(returncode=0)
args = argparse.Namespace(
model="hf",
tasks="test_task",
model_args="pretrained=test_model",
batch_size="1",
output_path=None,
write_out=False,
num_fewshot=None,
max_batch_size=None,
device=None,
limit=None,
samples=None,
use_cache=None,
cache_requests=None,
check_integrity=False,
log_samples=False,
system_instruction=None,
apply_chat_template=False,
fewshot_as_multiturn=False,
show_config=False,
include_path=None,
verbosity=None,
wandb_args="",
wandb_config_args="",
hf_hub_log_args="",
predict_only=False,
seed="0,1234,1234,1234",
trust_remote_code=False,
confirm_run_unsafe_code=False,
metadata=None,
gen_kwargs=None,
)
BenchmarkEvalSubcommand.cmd(args)
mock_run.assert_called_once()
@patch("subprocess.run")
@patch("pkg_resources.get_distribution")
def test_cmd_with_complex_args(self, mock_get_dist, mock_run):
mock_get_dist.return_value.version = "0.4.9.1"
mock_run.return_value = MagicMock(returncode=0)
args = argparse.Namespace(
model="hf",
tasks="test_task",
model_args='{"pretrained":"test_model","dtype":"float32"}',
batch_size="auto:32",
output_path="/tmp/output",
write_out=True,
num_fewshot=5,
max_batch_size=64,
device="cuda:0",
limit=0.5,
samples='{"task1":[1,2,3]}',
use_cache="/tmp/cache",
cache_requests="refresh",
check_integrity=True,
log_samples=True,
system_instruction="Test instruction",
apply_chat_template="template_name",
fewshot_as_multiturn=True,
show_config=True,
include_path="/tmp/include",
verbosity="DEBUG",
wandb_args="project=test",
wandb_config_args="lr=0.01",
hf_hub_log_args="repo=test",
predict_only=True,
seed="1,2,3,4",
trust_remote_code=True,
confirm_run_unsafe_code=True,
metadata='{"max_seq_length":4096}',
gen_kwargs='{"temperature":0.7}',
)
BenchmarkEvalSubcommand.cmd(args)
mock_run.assert_called_once()
@patch("subprocess.run", side_effect=FileNotFoundError())
@patch("pkg_resources.get_distribution")
def test_cmd_lm_eval_not_found(self, mock_get_dist, mock_run):
mock_get_dist.return_value.version = "0.4.9.1"
args = argparse.Namespace(
model="hf",
tasks="test_task",
model_args="pretrained=test_model",
batch_size="1",
output_path=None,
write_out=False,
num_fewshot=None,
max_batch_size=None,
device=None,
limit=None,
samples=None,
use_cache=None,
cache_requests=None,
check_integrity=False,
log_samples=False,
system_instruction=None,
apply_chat_template=False,
fewshot_as_multiturn=False,
show_config=False,
include_path=None,
verbosity=None,
wandb_args="",
wandb_config_args="",
hf_hub_log_args="",
predict_only=False,
seed="0,1234,1234,1234",
trust_remote_code=False,
confirm_run_unsafe_code=False,
metadata=None,
gen_kwargs=None,
)
with self.assertRaises(SystemExit):
BenchmarkEvalSubcommand.cmd(args)
@patch("pkg_resources.get_distribution")
def test_cmd_wrong_lm_eval_version(self, mock_get_dist):
mock_get_dist.return_value.version = "0.4.8"
args = argparse.Namespace(
model="hf",
tasks="test_task",
model_args="pretrained=test_model",
batch_size="1",
output_path=None,
write_out=False,
num_fewshot=None,
max_batch_size=None,
device=None,
limit=None,
samples=None,
use_cache=None,
cache_requests=None,
check_integrity=False,
log_samples=False,
system_instruction=None,
apply_chat_template=False,
fewshot_as_multiturn=False,
show_config=False,
include_path=None,
verbosity=None,
wandb_args="",
wandb_config_args="",
hf_hub_log_args="",
predict_only=False,
seed="0,1234,1234,1234",
trust_remote_code=False,
confirm_run_unsafe_code=False,
metadata=None,
gen_kwargs=None,
)
with self.assertRaises(SystemExit):
BenchmarkEvalSubcommand.cmd(args)
@patch("pkg_resources.get_distribution", side_effect=pkg_resources.DistributionNotFound)
def test_cmd_lm_eval_not_installed(self, mock_get_dist):
args = argparse.Namespace(
model="hf",
tasks="test_task",
model_args="pretrained=test_model",
batch_size="1",
output_path=None,
write_out=False,
num_fewshot=None,
max_batch_size=None,
device=None,
limit=None,
samples=None,
use_cache=None,
cache_requests=None,
check_integrity=False,
log_samples=False,
system_instruction=None,
apply_chat_template=False,
fewshot_as_multiturn=False,
show_config=False,
include_path=None,
verbosity=None,
wandb_args="",
wandb_config_args="",
hf_hub_log_args="",
predict_only=False,
seed="0,1234,1234,1234",
trust_remote_code=False,
confirm_run_unsafe_code=False,
metadata=None,
gen_kwargs=None,
)
with self.assertRaises(SystemExit):
BenchmarkEvalSubcommand.cmd(args)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,57 @@
"""
Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import argparse
import unittest
from fastdeploy.entrypoints.cli.benchmark.throughput import (
BenchmarkThroughputSubcommand,
)
class TestBenchmarkThroughputSubcommand(unittest.TestCase):
"""
测试 BenchmarkThroughputSubcommand 类。
"""
def test_add_cli_args(self):
parser = argparse.ArgumentParser()
BenchmarkThroughputSubcommand.add_cli_args(parser)
args = parser.parse_args(
[
"--backend",
"fastdeploy",
"--dataset-name",
"random",
"--input-len",
"100",
"--output-len",
"50",
"--num-prompts",
"10",
]
)
self.assertEqual(args.backend, "fastdeploy")
self.assertEqual(args.dataset_name, "random")
self.assertEqual(args.input_len, 100)
self.assertEqual(args.output_len, 50)
self.assertEqual(args.num_prompts, 10)
# 如果你在命令行运行这个文件,下面的代码会执行测试
if __name__ == "__main__":
unittest.main()

View File

@@ -26,7 +26,7 @@ class TestCollectEnvSubcommand(unittest.TestCase):
"collect-env",
help="Start collecting environment information.",
description="Start collecting environment information.",
usage="vllm collect-env",
usage="fastdeploy collect-env",
)