[Benchmark] Update benchmark (#5496)

* update benchmark

* update benchmark
This commit is contained in:
Zhang Yulong
2025-12-11 11:53:12 +08:00
committed by GitHub
parent 6289cbc434
commit 510b82173a
3 changed files with 148 additions and 0 deletions

View File

@@ -45,6 +45,7 @@ python -m pip install -r requirements.txt
--debug开启debug模式逐条打印payload和output内容默认False
--shuffle是否打乱数据集默认False不打乱
--seed打乱数据集时的随机种子默认0
--pd-metrics开启PD分离metrics指标收集会添加请求参数collect_metrics=True默认False
```
##### /v1/chat/completions接口压测单条数据调试

View File

@@ -51,6 +51,7 @@ class RequestFuncInput:
ignore_eos: bool = False
language: Optional[str] = None
debug: bool = False
pd_metrics: bool = False
response_format: Optional[dict] = None
random_flag: bool = False
@@ -74,6 +75,73 @@ class RequestFuncOutput:
prompt_len: int = 0
prompt_tokens: int = 0 # 推理侧返回输入token数
error: str = ""
metrics: dict = field(default_factory=dict)
def safe_cost(a, b):
"""时间差计算"""
if a is None or b is None:
return None
return a - b
def metrics_summary(metrics, token_timestamps):
"""Summarize metrics"""
if not metrics or len(token_timestamps) < 2:
return {}
m0 = metrics[0]
m_last = metrics[-1]
summary = {}
arrival_time = m0.get("arrival_time")
inference_start_time = m0.get("inference_start_time")
# prefill 总耗时
summary["prefill_cost_time"] = safe_cost(m0.get("send_request_output_to_decode_time"), arrival_time)
# prefill准备耗时
summary["prefill_prepare_cost_time"] = safe_cost(inference_start_time, arrival_time)
# 预处理耗时
summary["preprocess_cost_time"] = safe_cost(m0.get("scheduler_recv_req_time"), arrival_time)
# 请求缓存耗时
summary["cache_in_scheduler_cost_time"] = safe_cost(
m0.get("engine_get_req_time"), m0.get("scheduler_recv_req_time")
)
# 申请 decode资源耗时
summary["ask_decode_resource_cost_time"] = safe_cost(
m0.get("ask_decode_resource_finish_time"), m0.get("ask_decode_resource_start_time")
)
# prefill 的首 token 推理耗时
summary["prefill_first_token_infer_cost_time"] = safe_cost(
m0.get("engine_recv_first_token_time"), inference_start_time
)
# prefill 等待 cache 传输耗时
summary["wait_sending_cache_cost_time"] = safe_cost(
m0.get("send_request_output_to_decode_time"), m0.get("wait_for_sending_cache_time")
)
# decode分配资源耗时
summary["decode_preallocate_cost_time"] = safe_cost(
m_last.get("decode_preallocate_req_time"), m_last.get("decode_recv_req_time")
)
# decode准备推理耗时
summary["decode_prepare_cost_time"] = safe_cost(
m_last.get("decode_inference_start_time"), m_last.get("decode_recv_first_token_time")
)
# decode次token推理耗时
summary["decode_second_token_infer_cost_time"] = safe_cost(
m_last.get("decode_recv_second_token_time"), m_last.get("decode_inference_start_time")
)
# 返回首 token 链路耗时
summary["first_token_transmission_cost_time"] = safe_cost(
token_timestamps[0], m_last.get("decode_recv_first_token_time")
)
# 返回次 token 链路耗时
summary["second_token_transmission_cost_time"] = safe_cost(
token_timestamps[1], m_last.get("decode_recv_second_token_time")
)
return summary
async def async_request_eb_openai_chat_completions(
@@ -97,6 +165,7 @@ async def async_request_eb_openai_chat_completions(
"continuous_usage_stats": True,
},
"max_tokens": request_func_input.output_len,
"collect_metrics": request_func_input.pd_metrics,
}
if request_func_input.response_format:
payload["response_format"] = request_func_input.response_format
@@ -125,11 +194,13 @@ async def async_request_eb_openai_chat_completions(
output = RequestFuncOutput()
output.prompt_len = 0
output.no = request_func_input.no
metrics_list = []
request_id = "None"
ttft = 0.0
st = time.perf_counter()
most_recent_timestamp = st
token_timestamps = []
try:
async with session.post(url=api_url, json=payload, headers=headers) as response:
data = {}
@@ -144,6 +215,10 @@ async def async_request_eb_openai_chat_completions(
# print("####chunk:", chunk, type(chunk))
timestamp = time.perf_counter()
data = json.loads(chunk)
# print("####data:", json.dumps(data, indent=2, ensure_ascii=False))
if "metrics" in data:
metrics_list.append(data["metrics"])
if request_id == "None" and "id" in data:
request_id = data["id"]
@@ -169,16 +244,22 @@ async def async_request_eb_openai_chat_completions(
output.generated_text += content or ""
output.reasoning_content += reason_content or ""
# print(f"####content:{data}")
output.arrival_time.append(choices[0].get("arrival_time", timestamp))
elif usage := data.get("usage", {}):
output.output_tokens = usage.get("completion_tokens", 0)
output.prompt_tokens = usage.get("prompt_tokens", 0)
most_recent_timestamp = timestamp
token_timestamps.append(time.time())
# output.generated_text = generated_text
# 在流式结束时,记录最后一个 chunk 收到的时间戳
output.end_timestamp = most_recent_timestamp
# 新增metrics统计计算首token过滤空包
output.metrics = metrics_summary(metrics_list, token_timestamps[1:])
if output.generated_text.strip() == "":
output.success = False
output.error = "No generated text found!"

View File

@@ -318,6 +318,7 @@ async def benchmark(
selected_percentiles: list[float],
ignore_eos: bool,
debug: bool,
pd_metrics: bool,
goodput_config_dict: dict[str, float],
max_concurrency: Optional[int],
lora_modules: Optional[Iterable[str]],
@@ -352,6 +353,7 @@ async def benchmark(
logprobs=logprobs,
ignore_eos=ignore_eos,
debug=debug,
pd_metrics=pd_metrics,
extra_body=extra_body,
response_format=response_format,
random_flag=random_flag,
@@ -446,6 +448,7 @@ async def benchmark(
output_len=output_len,
logprobs=logprobs,
debug=debug,
pd_metrics=pd_metrics,
ignore_eos=ignore_eos,
extra_body=extra_body,
response_format=response_format,
@@ -548,6 +551,7 @@ async def benchmark(
"generated_texts": [output.generated_text for output in outputs],
"reasoning_contents": [output.reasoning_content for output in outputs],
"errors": [output.error for output in outputs],
"metrics": [output.metrics for output in outputs],
}
def process_one_metric(
@@ -583,6 +587,49 @@ async def benchmark(
print("{:<40} {:<10.2f}".format(f"P{p_word} {metric_name} (ms):", value))
result[f"p{p_word}_{metric_attribute_name}_ms"] = value
def process_pd_metrics(model_outputs, metric_key):
# 收集所有该 metric 的数值
values = []
percentiles = []
for p in args.metric_percentiles.split(","):
p = p.strip()
if p:
percentiles.append(float(p))
for item in model_outputs:
metrics = item.metrics
if metrics.get(metric_key, None) is not None:
values.append(metrics[metric_key])
if not values:
print(f"[WARN] metric_key '{metric_key}' not found in outputs.")
return
arr = np.array(values) * 1000 # 秒 -> 毫秒
print("{s:{c}^{n}}".format(s=metric_key, n=50, c="-"))
print(
"{:<40} {:<10.2f}".format(
f"Mean {metric_key} (ms):",
np.mean(arr),
)
)
print(
"{:<40} {:<10.2f}".format(
f"Median {metric_key} (ms):",
np.median(arr),
)
)
for p in percentiles:
v = np.percentile(arr, p)
print("{:<40} {:<10.2f}".format(f"P{str(int(p)) if int(p) == p else str(p)} {metric_key} (ms):", v))
# print(f"P{str(int(p)) if int(p) == p else str(p)} {metric_key} (ms): {v:10.2f}")
print(
"{:<40} {:<10.2f}".format(
f"Successful {metric_key}:",
len(arr),
)
)
def process_one_length(
# E.g., "ttft"
metric_attribute_name: str,
@@ -624,6 +671,19 @@ async def benchmark(
process_one_metric("s_itl", "S_ITL", "Infer Inter-token Latency")
process_one_metric("e2el", "E2EL", "End-to-end Latency")
process_one_metric("s_e2el", "S_E2EL", "Infer End-to-end Latency")
if any(item.metrics for item in outputs):
process_pd_metrics(outputs, "prefill_cost_time")
process_pd_metrics(outputs, "prefill_prepare_cost_time")
process_pd_metrics(outputs, "preprocess_cost_time")
process_pd_metrics(outputs, "cache_in_scheduler_cost_time")
process_pd_metrics(outputs, "ask_decode_resource_cost_time")
process_pd_metrics(outputs, "prefill_first_token_infer_cost_time")
process_pd_metrics(outputs, "wait_sending_cache_cost_time")
process_pd_metrics(outputs, "decode_preallocate_cost_time")
process_pd_metrics(outputs, "decode_prepare_cost_time")
process_pd_metrics(outputs, "decode_second_token_infer_cost_time")
process_pd_metrics(outputs, "first_token_transmission_cost_time")
process_pd_metrics(outputs, "second_token_transmission_cost_time")
process_one_length("input_len", "Cached Tokens", "Cached Tokens")
process_one_length("s_input_len", "Input Length", "Infer Input Length")
process_one_length("output_len", "Output Length", "Output Length")
@@ -941,6 +1001,7 @@ def main(args: argparse.Namespace):
selected_percentiles=[float(p) for p in args.metric_percentiles.split(",")],
ignore_eos=args.ignore_eos,
debug=args.debug,
pd_metrics=args.pd_metrics,
goodput_config_dict=goodput_config_dict,
max_concurrency=args.max_concurrency,
lora_modules=args.lora_modules,
@@ -1129,6 +1190,11 @@ if __name__ == "__main__":
action="store_true",
help="shuffle dataset",
)
parser.add_argument(
"--pd-metrics",
action="store_true",
help="请求时增加PD分离参数metrics: True",
)
parser.add_argument(
"--drop-ratio",
type=float,