mirror of
https://github.com/PaddlePaddle/FastDeploy.git
synced 2025-10-04 08:16:42 +08:00
适配vLLM无arrival_time;适配vLLM model必传;RequestFuncInput/RequestFuncOutput/SampleRequest新增用例编号no
This commit is contained in:
@@ -36,6 +36,7 @@ AIOHTTP_TIMEOUT = aiohttp.ClientTimeout(total=6 * 60 * 60)
|
||||
@dataclass
|
||||
class RequestFuncInput:
|
||||
"""Input for requesting LLMs via API"""
|
||||
no: int
|
||||
prompt: str
|
||||
history_QA: Optional[dict]
|
||||
hyper_parameters: dict
|
||||
@@ -54,6 +55,7 @@ class RequestFuncInput:
|
||||
@dataclass
|
||||
class RequestFuncOutput:
|
||||
"""Output for requesting LLMs via API"""
|
||||
no: int = 0
|
||||
generated_text: str = ""
|
||||
reasoning_content: str = ""
|
||||
success: bool = False
|
||||
@@ -84,7 +86,7 @@ async def async_request_eb_openai_chat_completions(
|
||||
if request_func_input.multi_modal_content:
|
||||
content.append(request_func_input.multi_modal_content)
|
||||
payload = {
|
||||
"model": "default",
|
||||
"model": request_func_input.model,
|
||||
"messages": request_func_input.history_QA,
|
||||
"stream": True,
|
||||
"stream_options": {
|
||||
@@ -97,6 +99,9 @@ async def async_request_eb_openai_chat_completions(
|
||||
|
||||
if request_func_input.ignore_eos:
|
||||
payload["ignore_eos"] = request_func_input.ignore_eos
|
||||
|
||||
print("payload:{}".format(json.dumps(payload, ensure_ascii=False)))
|
||||
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
"Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}",
|
||||
@@ -104,6 +109,7 @@ async def async_request_eb_openai_chat_completions(
|
||||
|
||||
output = RequestFuncOutput()
|
||||
output.prompt_len = 0
|
||||
output.no = request_func_input.no
|
||||
|
||||
ttft = 0.0
|
||||
st = time.perf_counter()
|
||||
@@ -132,7 +138,8 @@ async def async_request_eb_openai_chat_completions(
|
||||
ttft = timestamp - st
|
||||
output.ttft = ttft
|
||||
# cached_tokens
|
||||
output.prompt_len = data["usage"]["prompt_tokens_details"]["cached_tokens"]
|
||||
output.prompt_len = data["usage"].get("prompt_tokens_details", {}).get("cached_tokens", 0)
|
||||
|
||||
|
||||
# Decoding phase
|
||||
else:
|
||||
@@ -141,12 +148,12 @@ async def async_request_eb_openai_chat_completions(
|
||||
|
||||
output.generated_text += content or ""
|
||||
output.reasoning_content += reason_content or ""
|
||||
output.arrival_time.append(choices[0].get("arrival_time"))
|
||||
elif usage := data.get("usage"):
|
||||
output.arrival_time.append(choices[0].get("arrival_time", timestamp))
|
||||
elif usage := data.get("usage", {}):
|
||||
output.output_tokens = usage.get(
|
||||
"completion_tokens")
|
||||
"completion_tokens", 0)
|
||||
output.prompt_tokens = usage.get(
|
||||
"prompt_tokens")
|
||||
"prompt_tokens", 0)
|
||||
|
||||
most_recent_timestamp = timestamp
|
||||
|
||||
@@ -173,6 +180,7 @@ async def async_request_eb_openai_chat_completions(
|
||||
f.write(str(output) + "\n")
|
||||
if pbar:
|
||||
pbar.update(1)
|
||||
print("#####final_output:", output)
|
||||
return output
|
||||
|
||||
|
||||
@@ -189,7 +197,7 @@ async def async_request_eb_openai_completions(
|
||||
async with aiohttp.ClientSession(trust_env=True,
|
||||
timeout=AIOHTTP_TIMEOUT) as session:
|
||||
payload = {
|
||||
"model": "default",
|
||||
"model": request_func_input.model,
|
||||
"prompt": request_func_input.prompt,
|
||||
"stream": True,
|
||||
"stream_options": {
|
||||
@@ -202,14 +210,20 @@ async def async_request_eb_openai_completions(
|
||||
|
||||
if request_func_input.ignore_eos:
|
||||
payload["ignore_eos"] = request_func_input.ignore_eos
|
||||
|
||||
print("payload:", json.dumps(payload, ensure_ascii=False))
|
||||
|
||||
headers = {
|
||||
"Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}"
|
||||
"Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}",
|
||||
"Content-Type": "application/json"
|
||||
}
|
||||
|
||||
output = RequestFuncOutput()
|
||||
output.prompt_len = request_func_input.prompt_len
|
||||
output.no = request_func_input.no
|
||||
|
||||
generated_text = ""
|
||||
ttft = 0.0
|
||||
st = time.perf_counter()
|
||||
most_recent_timestamp = st
|
||||
try:
|
||||
@@ -226,6 +240,7 @@ async def async_request_eb_openai_completions(
|
||||
"data: ")
|
||||
if chunk != "[DONE]":
|
||||
# print("####chunk:", chunk, chunk.usage)
|
||||
timestamp = time.perf_counter()
|
||||
data = json.loads(chunk)
|
||||
|
||||
# NOTE: Some completion API might have a last
|
||||
@@ -235,11 +250,11 @@ async def async_request_eb_openai_completions(
|
||||
# Note that text could be empty here
|
||||
# e.g. for special tokens
|
||||
text = choices[0].get("text")
|
||||
timestamp = time.perf_counter()
|
||||
|
||||
# First token
|
||||
if not first_chunk_received:
|
||||
first_chunk_received = True
|
||||
ttft = time.perf_counter() - st
|
||||
ttft = timestamp - st
|
||||
output.ttft = ttft
|
||||
|
||||
# Decoding phase
|
||||
@@ -247,9 +262,10 @@ async def async_request_eb_openai_completions(
|
||||
output.itl.append(timestamp -
|
||||
most_recent_timestamp)
|
||||
|
||||
most_recent_timestamp = timestamp
|
||||
output.arrival_time.append(choices[0].get("arrival_time"))
|
||||
generated_text += text or ""
|
||||
|
||||
most_recent_timestamp = timestamp
|
||||
output.arrival_time.append(choices[0].get("arrival_time", timestamp))
|
||||
elif usage := data.get("usage"):
|
||||
output.prompt_tokens = usage.get(
|
||||
"prompt_tokens")
|
||||
@@ -262,8 +278,15 @@ async def async_request_eb_openai_completions(
|
||||
output.error = (
|
||||
"Never received a valid chunk to calculate TTFT."
|
||||
"This response will be marked as failed!")
|
||||
|
||||
output.generated_text = generated_text
|
||||
output.latency = most_recent_timestamp - st
|
||||
|
||||
if output.generated_text == "":
|
||||
output.success = False
|
||||
output.error = "No generated text found!"
|
||||
else:
|
||||
output.success = True
|
||||
else:
|
||||
output.error = response.reason or ""
|
||||
output.success = False
|
||||
@@ -272,6 +295,8 @@ async def async_request_eb_openai_completions(
|
||||
exc_info = sys.exc_info()
|
||||
output.error = "".join(traceback.format_exception(*exc_info))
|
||||
|
||||
print("final_output:{}".format(output))
|
||||
|
||||
if pbar:
|
||||
pbar.update(1)
|
||||
return output
|
||||
|
@@ -38,7 +38,7 @@ class SampleRequest:
|
||||
"""
|
||||
Represents a single inference request for benchmarking.
|
||||
"""
|
||||
|
||||
no: int
|
||||
prompt: Union[str, Any]
|
||||
history_QA: Union[str, Any]
|
||||
json_data: Optional[dict]
|
||||
@@ -229,6 +229,7 @@ class EBDataset(BenchmarkDataset):
|
||||
**kwargs,
|
||||
) -> list:
|
||||
samples: list = []
|
||||
cnt = 1
|
||||
for entry in self.data:
|
||||
if len(samples) >= num_requests:
|
||||
break
|
||||
@@ -246,16 +247,17 @@ class EBDataset(BenchmarkDataset):
|
||||
prompt, None)
|
||||
samples.append(
|
||||
SampleRequest(
|
||||
no=cnt,
|
||||
prompt=prompt,
|
||||
prompt_len=self.prompt_len,
|
||||
history_QA=[],
|
||||
expected_output_len=new_output_len,
|
||||
))
|
||||
cnt += 1
|
||||
|
||||
self.maybe_oversample_requests(samples, num_requests)
|
||||
return samples
|
||||
|
||||
|
||||
class EBChatDataset(BenchmarkDataset):
|
||||
"""
|
||||
Implements the ShareGPT dataset. Loads data from a JSON file and generates
|
||||
@@ -284,6 +286,7 @@ class EBChatDataset(BenchmarkDataset):
|
||||
**kwargs,
|
||||
) -> list:
|
||||
samples: list = []
|
||||
cnt = 1
|
||||
for entry in self.data:
|
||||
if len(samples) >= num_requests:
|
||||
break
|
||||
@@ -297,12 +300,14 @@ class EBChatDataset(BenchmarkDataset):
|
||||
prompt, None)
|
||||
samples.append(
|
||||
SampleRequest(
|
||||
no=cnt,
|
||||
json_data=json_data,
|
||||
prompt=prompt,
|
||||
prompt_len=0,
|
||||
history_QA=history_QA,
|
||||
expected_output_len=new_output_len,
|
||||
))
|
||||
cnt += 1
|
||||
|
||||
self.maybe_oversample_requests(samples, num_requests)
|
||||
return samples
|
||||
|
@@ -182,6 +182,7 @@ def calculate_metrics(
|
||||
# len(outputs[i].itl) since multiple output tokens may be
|
||||
# bundled together
|
||||
# Note : this may inflate the output token count slightly
|
||||
continue
|
||||
|
||||
actual_output_lens.append(output_len)
|
||||
input_lens.append(outputs[i].prompt_len)
|
||||
@@ -209,6 +210,8 @@ def calculate_metrics(
|
||||
if len(outputs[i].arrival_time) > 2:
|
||||
s_decodes.append((outputs[i].output_tokens - 1) /
|
||||
(outputs[i].arrival_time[-1] - outputs[i].arrival_time[1]))
|
||||
else:
|
||||
print("len(outputs[i].arrival_time) <= 2")
|
||||
completed += 1
|
||||
else:
|
||||
actual_output_lens.append(0)
|
||||
@@ -341,15 +344,16 @@ async def benchmark(
|
||||
raise ValueError(f"Unknown backend: {backend}")
|
||||
|
||||
print("Starting initial single prompt test run...")
|
||||
test_prompt, test_output_len = \
|
||||
test_prompt, test_output_len, test_no = \
|
||||
input_requests[0].prompt, \
|
||||
input_requests[0].expected_output_len
|
||||
input_requests[0].expected_output_len, input_requests[0].no
|
||||
test_history_QA = input_requests[0].history_QA
|
||||
|
||||
test_input = RequestFuncInput(
|
||||
model=model_id,
|
||||
model_name=model_name,
|
||||
prompt=test_prompt,
|
||||
no=test_no,
|
||||
prompt_len=0,
|
||||
history_QA=test_history_QA,
|
||||
hyper_parameters=hyper_parameters,
|
||||
@@ -384,6 +388,7 @@ async def benchmark(
|
||||
profile_input = RequestFuncInput(model=model_id,
|
||||
model_name=model_name,
|
||||
prompt=test_prompt,
|
||||
no=test_no,
|
||||
api_url=base_url + "/start_profile",
|
||||
output_len=test_output_len,
|
||||
logprobs=logprobs,
|
||||
@@ -422,7 +427,7 @@ async def benchmark(
|
||||
benchmark_start_time = time.perf_counter()
|
||||
tasks: list[asyncio.Task] = []
|
||||
async for request in get_request(input_requests, request_rate, burstiness):
|
||||
prompt, output_len = request.prompt, request.expected_output_len
|
||||
prompt, output_len, no = request.prompt, request.expected_output_len, request.no
|
||||
history_QA = request.history_QA
|
||||
|
||||
req_model_id, req_model_name = model_id, model_name
|
||||
@@ -433,6 +438,7 @@ async def benchmark(
|
||||
request_func_input = RequestFuncInput(model=req_model_id,
|
||||
model_name=req_model_name,
|
||||
prompt=prompt,
|
||||
no=no,
|
||||
prompt_len=0,
|
||||
history_QA=history_QA,
|
||||
hyper_parameters=hyper_parameters,
|
||||
@@ -452,6 +458,7 @@ async def benchmark(
|
||||
profile_input = RequestFuncInput(
|
||||
model=model_id,
|
||||
prompt=test_prompt,
|
||||
no=test_no,
|
||||
api_url=base_url + "/stop_profile",
|
||||
output_len=test_output_len,
|
||||
logprobs=logprobs,
|
||||
@@ -464,6 +471,8 @@ async def benchmark(
|
||||
pbar.close()
|
||||
|
||||
benchmark_duration = time.perf_counter() - benchmark_start_time
|
||||
print("benchmark_duration:", benchmark_duration)
|
||||
|
||||
|
||||
metrics, actual_output_lens = calculate_metrics(
|
||||
input_requests=input_requests,
|
||||
@@ -594,6 +603,155 @@ async def benchmark(
|
||||
return result
|
||||
|
||||
|
||||
def benchmark_metrics(
|
||||
benchmark_duration: float,
|
||||
result_file: str,
|
||||
selected_percentiles: list[float],
|
||||
selected_percentile_metrics: list[str],
|
||||
goodput_config_dict: dict[str, float],
|
||||
):
|
||||
"""Benchmark metrics statistics,generate benchmark result"""
|
||||
outputs = []
|
||||
case_no_list = []
|
||||
with open(result_file) as f:
|
||||
for line in f.readlines():
|
||||
if "RequestFuncOutput" in line:
|
||||
start = line.find("RequestFuncOutput")
|
||||
end = line.rfind(")")
|
||||
para_str = line[start:end + 1]
|
||||
|
||||
output = eval(para_str)
|
||||
outputs.append(output)
|
||||
|
||||
input_requests = [[]] * len(outputs)
|
||||
goodput_config_dict = check_goodput_args(args)
|
||||
|
||||
metrics, actual_output_lens = calculate_metrics(
|
||||
input_requests=input_requests,
|
||||
outputs=outputs,
|
||||
dur_s=benchmark_duration,
|
||||
selected_percentiles=selected_percentiles,
|
||||
goodput_config_dict=goodput_config_dict,
|
||||
)
|
||||
|
||||
print("{s:{c}^{n}}".format(s=' Serving Benchmark Result ', n=50, c='='))
|
||||
print("{:<40} {:<10}".format("Successful requests:", metrics.completed))
|
||||
print("{:<40} {:<10.2f}".format("Benchmark duration (s):",
|
||||
benchmark_duration))
|
||||
print("{:<40} {:<10}".format("Total input tokens:", metrics.total_input))
|
||||
print("{:<40} {:<10}".format("Total generated tokens:",
|
||||
metrics.total_output))
|
||||
print("{:<40} {:<10.2f}".format("Request throughput (req/s):",
|
||||
metrics.request_throughput))
|
||||
if goodput_config_dict:
|
||||
print("{:<40} {:<10.2f}".format("Request goodput (req/s):",
|
||||
metrics.request_goodput))
|
||||
print("{:<40} {:<10.2f}".format("Output token throughput (tok/s):",
|
||||
metrics.output_throughput))
|
||||
print("{:<40} {:<10.2f}".format("Total Token throughput (tok/s):",
|
||||
metrics.total_token_throughput))
|
||||
|
||||
result = {
|
||||
"duration": benchmark_duration,
|
||||
"completed": metrics.completed,
|
||||
"total_input_tokens": metrics.total_input,
|
||||
"total_output_tokens": metrics.total_output,
|
||||
"request_throughput": metrics.request_throughput,
|
||||
"request_goodput:":
|
||||
metrics.request_goodput if goodput_config_dict else None,
|
||||
"output_throughput": metrics.output_throughput,
|
||||
"total_token_throughput": metrics.total_token_throughput,
|
||||
"input_lens": [output.prompt_len for output in outputs],
|
||||
"output_lens": actual_output_lens,
|
||||
"ttfts": [output.ttft for output in outputs],
|
||||
"itls": [output.itl for output in outputs],
|
||||
"input_texts": ["" for input in input_requests],
|
||||
"generated_texts": [output.generated_text for output in outputs],
|
||||
"errors": [output.error for output in outputs],
|
||||
}
|
||||
|
||||
def process_one_metric(
|
||||
# E.g., "ttft"
|
||||
metric_attribute_name: str,
|
||||
# E.g., "TTFT"
|
||||
metric_name: str,
|
||||
# E.g., "Time to First Token"
|
||||
metric_header: str,
|
||||
):
|
||||
# This function prints and adds statistics of the specified
|
||||
# metric.
|
||||
if metric_attribute_name not in selected_percentile_metrics:
|
||||
return
|
||||
print("{s:{c}^{n}}".format(s=metric_header, n=50, c='-'))
|
||||
print("{:<40} {:<10.2f}".format(
|
||||
f"Mean {metric_name} (ms):",
|
||||
getattr(metrics, f"mean_{metric_attribute_name}_ms")))
|
||||
print("{:<40} {:<10.2f}".format(
|
||||
f"Median {metric_name} (ms):",
|
||||
getattr(metrics, f"median_{metric_attribute_name}_ms")))
|
||||
result[f"mean_{metric_attribute_name}_ms"] = getattr(
|
||||
metrics, f"mean_{metric_attribute_name}_ms")
|
||||
result[f"median_{metric_attribute_name}_ms"] = getattr(
|
||||
metrics, f"median_{metric_attribute_name}_ms")
|
||||
result[f"std_{metric_attribute_name}_ms"] = getattr(
|
||||
metrics, f"std_{metric_attribute_name}_ms")
|
||||
for p, value in getattr(metrics,
|
||||
f"percentiles_{metric_attribute_name}_ms"):
|
||||
p_word = str(int(p)) if int(p) == p else str(p)
|
||||
print("{:<40} {:<10.2f}".format(f"P{p_word} {metric_name} (ms):",
|
||||
value))
|
||||
result[f"p{p_word}_{metric_attribute_name}_ms"] = value
|
||||
|
||||
def process_one_length(
|
||||
# E.g., "ttft"
|
||||
metric_attribute_name: str,
|
||||
# E.g., "TTFT"
|
||||
metric_name: str,
|
||||
# E.g., "Time to First Token"
|
||||
metric_header: str,
|
||||
):
|
||||
# This function prints and adds statistics of the specified
|
||||
# metric.
|
||||
if metric_attribute_name not in selected_percentile_metrics:
|
||||
return
|
||||
print("{s:{c}^{n}}".format(s=metric_header, n=50, c='-'))
|
||||
print("{:<40} {:<10.2f}".format(
|
||||
f"Mean {metric_name}:",
|
||||
getattr(metrics, f"mean_{metric_attribute_name}")))
|
||||
print("{:<40} {:<10.2f}".format(
|
||||
f"Median {metric_name}:",
|
||||
getattr(metrics, f"median_{metric_attribute_name}")))
|
||||
result[f"mean_{metric_attribute_name}"] = getattr(
|
||||
metrics, f"mean_{metric_attribute_name}")
|
||||
result[f"median_{metric_attribute_name}"] = getattr(
|
||||
metrics, f"median_{metric_attribute_name}")
|
||||
result[f"std_{metric_attribute_name}"] = getattr(
|
||||
metrics, f"std_{metric_attribute_name}")
|
||||
for p, value in getattr(metrics,
|
||||
f"percentiles_{metric_attribute_name}"):
|
||||
p_word = str(int(p)) if int(p) == p else str(p)
|
||||
print("{:<40} {:<10.2f}".format(f"P{p_word} {metric_name}:",
|
||||
value))
|
||||
result[f"p{p_word}_{metric_attribute_name}"] = value
|
||||
|
||||
process_one_length("s_decode", "Decode", "解码速度(tok/s)")
|
||||
process_one_metric("ttft", "TTFT", "Time to First Token")
|
||||
process_one_metric("s_ttft", "S_TTFT", "Infer Time to First Token")
|
||||
process_one_metric("tpot", "TPOT",
|
||||
"Time per Output Token (excl. 1st token)")
|
||||
process_one_metric("itl", "ITL", "Inter-token Latency")
|
||||
process_one_metric("s_itl", "S_ITL", "Infer Inter-token Latency")
|
||||
process_one_metric("e2el", "E2EL", "End-to-end Latency")
|
||||
process_one_metric("s_e2el", "S_E2EL", "Infer End-to-end Latency")
|
||||
process_one_length("input_len", "Input Length", "Input Length")
|
||||
process_one_length("s_input_len", "Input Length", "Infer Input Length")
|
||||
process_one_length("output_len", "Output Length", "Output Length")
|
||||
|
||||
print("=" * 50)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def check_goodput_args(args):
|
||||
"""Check whether the given argument has valid goodput configuration or not"""
|
||||
# Check and parse goodput arguments
|
||||
@@ -760,6 +918,16 @@ def main(args: argparse.Namespace):
|
||||
extra_body=sampling_params,
|
||||
))
|
||||
|
||||
# benchmark_result = benchmark_metrics(
|
||||
# benchmark_duration=3600,
|
||||
# result_file="your result file",
|
||||
# selected_percentile_metrics=args.percentile_metrics.split(","),
|
||||
# selected_percentiles=[
|
||||
# float(p) for p in args.metric_percentiles.split(",")
|
||||
# ],
|
||||
# goodput_config_dict=goodput_config_dict,
|
||||
# )
|
||||
|
||||
# Save config and results to json
|
||||
if args.save_result:
|
||||
result_json: dict[str, Any] = {}
|
||||
|
Reference in New Issue
Block a user