diff --git a/benchmarks/README.md b/benchmarks/README.md index 8cd9b9fce..c3428841d 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -45,6 +45,7 @@ python -m pip install -r requirements.txt --debug:开启debug模式,逐条打印payload和output内容,默认False --shuffle:是否打乱数据集,默认False不打乱 --seed:打乱数据集时的随机种子,默认0 +--pd-metrics:开启PD分离metrics指标收集,会添加请求参数collect_metrics=True,默认False ``` ##### /v1/chat/completions接口压测单条数据调试 diff --git a/benchmarks/backend_request_func.py b/benchmarks/backend_request_func.py index 6e1988239..54d707c07 100644 --- a/benchmarks/backend_request_func.py +++ b/benchmarks/backend_request_func.py @@ -51,6 +51,7 @@ class RequestFuncInput: ignore_eos: bool = False language: Optional[str] = None debug: bool = False + pd_metrics: bool = False response_format: Optional[dict] = None random_flag: bool = False @@ -74,6 +75,73 @@ class RequestFuncOutput: prompt_len: int = 0 prompt_tokens: int = 0 # 推理侧返回输入token数 error: str = "" + metrics: dict = field(default_factory=dict) + + +def safe_cost(a, b): + """时间差计算""" + if a is None or b is None: + return None + return a - b + + +def metrics_summary(metrics, token_timestamps): + """Summarize metrics""" + if not metrics or len(token_timestamps) < 2: + return {} + + m0 = metrics[0] + m_last = metrics[-1] + + summary = {} + + arrival_time = m0.get("arrival_time") + inference_start_time = m0.get("inference_start_time") + + # prefill 总耗时 + summary["prefill_cost_time"] = safe_cost(m0.get("send_request_output_to_decode_time"), arrival_time) + # prefill准备耗时 + summary["prefill_prepare_cost_time"] = safe_cost(inference_start_time, arrival_time) + # 预处理耗时 + summary["preprocess_cost_time"] = safe_cost(m0.get("scheduler_recv_req_time"), arrival_time) + # 请求缓存耗时 + summary["cache_in_scheduler_cost_time"] = safe_cost( + m0.get("engine_get_req_time"), m0.get("scheduler_recv_req_time") + ) + # 申请 decode资源耗时 + summary["ask_decode_resource_cost_time"] = safe_cost( + m0.get("ask_decode_resource_finish_time"), m0.get("ask_decode_resource_start_time") + ) + # prefill 的首 token 推理耗时 + summary["prefill_first_token_infer_cost_time"] = safe_cost( + m0.get("engine_recv_first_token_time"), inference_start_time + ) + # prefill 等待 cache 传输耗时 + summary["wait_sending_cache_cost_time"] = safe_cost( + m0.get("send_request_output_to_decode_time"), m0.get("wait_for_sending_cache_time") + ) + # decode分配资源耗时 + summary["decode_preallocate_cost_time"] = safe_cost( + m_last.get("decode_preallocate_req_time"), m_last.get("decode_recv_req_time") + ) + # decode准备推理耗时 + summary["decode_prepare_cost_time"] = safe_cost( + m_last.get("decode_inference_start_time"), m_last.get("decode_recv_first_token_time") + ) + # decode次token推理耗时 + summary["decode_second_token_infer_cost_time"] = safe_cost( + m_last.get("decode_recv_second_token_time"), m_last.get("decode_inference_start_time") + ) + # 返回首 token 链路耗时 + summary["first_token_transmission_cost_time"] = safe_cost( + token_timestamps[0], m_last.get("decode_recv_first_token_time") + ) + # 返回次 token 链路耗时 + summary["second_token_transmission_cost_time"] = safe_cost( + token_timestamps[1], m_last.get("decode_recv_second_token_time") + ) + + return summary async def async_request_eb_openai_chat_completions( @@ -97,6 +165,7 @@ async def async_request_eb_openai_chat_completions( "continuous_usage_stats": True, }, "max_tokens": request_func_input.output_len, + "collect_metrics": request_func_input.pd_metrics, } if request_func_input.response_format: payload["response_format"] = request_func_input.response_format @@ -125,11 +194,13 @@ async def async_request_eb_openai_chat_completions( output = RequestFuncOutput() output.prompt_len = 0 output.no = request_func_input.no + metrics_list = [] request_id = "None" ttft = 0.0 st = time.perf_counter() most_recent_timestamp = st + token_timestamps = [] try: async with session.post(url=api_url, json=payload, headers=headers) as response: data = {} @@ -144,6 +215,10 @@ async def async_request_eb_openai_chat_completions( # print("####chunk:", chunk, type(chunk)) timestamp = time.perf_counter() data = json.loads(chunk) + # print("####data:", json.dumps(data, indent=2, ensure_ascii=False)) + + if "metrics" in data: + metrics_list.append(data["metrics"]) if request_id == "None" and "id" in data: request_id = data["id"] @@ -169,16 +244,22 @@ async def async_request_eb_openai_chat_completions( output.generated_text += content or "" output.reasoning_content += reason_content or "" + # print(f"####content:{data}") output.arrival_time.append(choices[0].get("arrival_time", timestamp)) elif usage := data.get("usage", {}): output.output_tokens = usage.get("completion_tokens", 0) output.prompt_tokens = usage.get("prompt_tokens", 0) most_recent_timestamp = timestamp + token_timestamps.append(time.time()) # output.generated_text = generated_text # 在流式结束时,记录最后一个 chunk 收到的时间戳 output.end_timestamp = most_recent_timestamp + + # 新增metrics统计,计算首token过滤空包 + output.metrics = metrics_summary(metrics_list, token_timestamps[1:]) + if output.generated_text.strip() == "": output.success = False output.error = "No generated text found!" diff --git a/benchmarks/benchmark_serving.py b/benchmarks/benchmark_serving.py index b9e61ef7a..47fed8fb8 100644 --- a/benchmarks/benchmark_serving.py +++ b/benchmarks/benchmark_serving.py @@ -318,6 +318,7 @@ async def benchmark( selected_percentiles: list[float], ignore_eos: bool, debug: bool, + pd_metrics: bool, goodput_config_dict: dict[str, float], max_concurrency: Optional[int], lora_modules: Optional[Iterable[str]], @@ -352,6 +353,7 @@ async def benchmark( logprobs=logprobs, ignore_eos=ignore_eos, debug=debug, + pd_metrics=pd_metrics, extra_body=extra_body, response_format=response_format, random_flag=random_flag, @@ -446,6 +448,7 @@ async def benchmark( output_len=output_len, logprobs=logprobs, debug=debug, + pd_metrics=pd_metrics, ignore_eos=ignore_eos, extra_body=extra_body, response_format=response_format, @@ -548,6 +551,7 @@ async def benchmark( "generated_texts": [output.generated_text for output in outputs], "reasoning_contents": [output.reasoning_content for output in outputs], "errors": [output.error for output in outputs], + "metrics": [output.metrics for output in outputs], } def process_one_metric( @@ -583,6 +587,49 @@ async def benchmark( print("{:<40} {:<10.2f}".format(f"P{p_word} {metric_name} (ms):", value)) result[f"p{p_word}_{metric_attribute_name}_ms"] = value + def process_pd_metrics(model_outputs, metric_key): + # 收集所有该 metric 的数值 + values = [] + percentiles = [] + for p in args.metric_percentiles.split(","): + p = p.strip() + if p: + percentiles.append(float(p)) + for item in model_outputs: + metrics = item.metrics + if metrics.get(metric_key, None) is not None: + values.append(metrics[metric_key]) + + if not values: + print(f"[WARN] metric_key '{metric_key}' not found in outputs.") + return + + arr = np.array(values) * 1000 # 秒 -> 毫秒 + + print("{s:{c}^{n}}".format(s=metric_key, n=50, c="-")) + print( + "{:<40} {:<10.2f}".format( + f"Mean {metric_key} (ms):", + np.mean(arr), + ) + ) + print( + "{:<40} {:<10.2f}".format( + f"Median {metric_key} (ms):", + np.median(arr), + ) + ) + for p in percentiles: + v = np.percentile(arr, p) + print("{:<40} {:<10.2f}".format(f"P{str(int(p)) if int(p) == p else str(p)} {metric_key} (ms):", v)) + # print(f"P{str(int(p)) if int(p) == p else str(p)} {metric_key} (ms): {v:10.2f}") + print( + "{:<40} {:<10.2f}".format( + f"Successful {metric_key}:", + len(arr), + ) + ) + def process_one_length( # E.g., "ttft" metric_attribute_name: str, @@ -624,6 +671,19 @@ async def benchmark( process_one_metric("s_itl", "S_ITL", "Infer Inter-token Latency") process_one_metric("e2el", "E2EL", "End-to-end Latency") process_one_metric("s_e2el", "S_E2EL", "Infer End-to-end Latency") + if any(item.metrics for item in outputs): + process_pd_metrics(outputs, "prefill_cost_time") + process_pd_metrics(outputs, "prefill_prepare_cost_time") + process_pd_metrics(outputs, "preprocess_cost_time") + process_pd_metrics(outputs, "cache_in_scheduler_cost_time") + process_pd_metrics(outputs, "ask_decode_resource_cost_time") + process_pd_metrics(outputs, "prefill_first_token_infer_cost_time") + process_pd_metrics(outputs, "wait_sending_cache_cost_time") + process_pd_metrics(outputs, "decode_preallocate_cost_time") + process_pd_metrics(outputs, "decode_prepare_cost_time") + process_pd_metrics(outputs, "decode_second_token_infer_cost_time") + process_pd_metrics(outputs, "first_token_transmission_cost_time") + process_pd_metrics(outputs, "second_token_transmission_cost_time") process_one_length("input_len", "Cached Tokens", "Cached Tokens") process_one_length("s_input_len", "Input Length", "Infer Input Length") process_one_length("output_len", "Output Length", "Output Length") @@ -941,6 +1001,7 @@ def main(args: argparse.Namespace): selected_percentiles=[float(p) for p in args.metric_percentiles.split(",")], ignore_eos=args.ignore_eos, debug=args.debug, + pd_metrics=args.pd_metrics, goodput_config_dict=goodput_config_dict, max_concurrency=args.max_concurrency, lora_modules=args.lora_modules, @@ -1129,6 +1190,11 @@ if __name__ == "__main__": action="store_true", help="shuffle dataset", ) + parser.add_argument( + "--pd-metrics", + action="store_true", + help="请求时增加PD分离参数,metrics: True", + ) parser.add_argument( "--drop-ratio", type=float,