diff --git a/benchmarks/backend_request_func.py b/benchmarks/backend_request_func.py index 84b11d7a9..aacc94fab 100644 --- a/benchmarks/backend_request_func.py +++ b/benchmarks/backend_request_func.py @@ -36,6 +36,7 @@ AIOHTTP_TIMEOUT = aiohttp.ClientTimeout(total=6 * 60 * 60) @dataclass class RequestFuncInput: """Input for requesting LLMs via API""" + no: int prompt: str history_QA: Optional[dict] hyper_parameters: dict @@ -54,6 +55,7 @@ class RequestFuncInput: @dataclass class RequestFuncOutput: """Output for requesting LLMs via API""" + no: int = 0 generated_text: str = "" reasoning_content: str = "" success: bool = False @@ -84,7 +86,7 @@ async def async_request_eb_openai_chat_completions( if request_func_input.multi_modal_content: content.append(request_func_input.multi_modal_content) payload = { - "model": "default", + "model": request_func_input.model, "messages": request_func_input.history_QA, "stream": True, "stream_options": { @@ -97,6 +99,9 @@ async def async_request_eb_openai_chat_completions( if request_func_input.ignore_eos: payload["ignore_eos"] = request_func_input.ignore_eos + + print("payload:{}".format(json.dumps(payload, ensure_ascii=False))) + headers = { "Content-Type": "application/json", "Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}", @@ -104,6 +109,7 @@ async def async_request_eb_openai_chat_completions( output = RequestFuncOutput() output.prompt_len = 0 + output.no = request_func_input.no ttft = 0.0 st = time.perf_counter() @@ -132,7 +138,8 @@ async def async_request_eb_openai_chat_completions( ttft = timestamp - st output.ttft = ttft # cached_tokens - output.prompt_len = data["usage"]["prompt_tokens_details"]["cached_tokens"] + output.prompt_len = data["usage"].get("prompt_tokens_details", {}).get("cached_tokens", 0) + # Decoding phase else: @@ -141,12 +148,12 @@ async def async_request_eb_openai_chat_completions( output.generated_text += content or "" output.reasoning_content += reason_content or "" - output.arrival_time.append(choices[0].get("arrival_time")) - elif usage := data.get("usage"): + output.arrival_time.append(choices[0].get("arrival_time", timestamp)) + elif usage := data.get("usage", {}): output.output_tokens = usage.get( - "completion_tokens") + "completion_tokens", 0) output.prompt_tokens = usage.get( - "prompt_tokens") + "prompt_tokens", 0) most_recent_timestamp = timestamp @@ -173,6 +180,7 @@ async def async_request_eb_openai_chat_completions( f.write(str(output) + "\n") if pbar: pbar.update(1) + print("#####final_output:", output) return output @@ -189,7 +197,7 @@ async def async_request_eb_openai_completions( async with aiohttp.ClientSession(trust_env=True, timeout=AIOHTTP_TIMEOUT) as session: payload = { - "model": "default", + "model": request_func_input.model, "prompt": request_func_input.prompt, "stream": True, "stream_options": { @@ -202,14 +210,20 @@ async def async_request_eb_openai_completions( if request_func_input.ignore_eos: payload["ignore_eos"] = request_func_input.ignore_eos + + print("payload:", json.dumps(payload, ensure_ascii=False)) + headers = { - "Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}" + "Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}", + "Content-Type": "application/json" } output = RequestFuncOutput() output.prompt_len = request_func_input.prompt_len + output.no = request_func_input.no generated_text = "" + ttft = 0.0 st = time.perf_counter() most_recent_timestamp = st try: @@ -226,6 +240,7 @@ async def async_request_eb_openai_completions( "data: ") if chunk != "[DONE]": # print("####chunk:", chunk, chunk.usage) + timestamp = time.perf_counter() data = json.loads(chunk) # NOTE: Some completion API might have a last @@ -235,21 +250,22 @@ async def async_request_eb_openai_completions( # Note that text could be empty here # e.g. for special tokens text = choices[0].get("text") - timestamp = time.perf_counter() + # First token if not first_chunk_received: first_chunk_received = True - ttft = time.perf_counter() - st + ttft = timestamp - st output.ttft = ttft # Decoding phase else: output.itl.append(timestamp - most_recent_timestamp) + + generated_text += text or "" most_recent_timestamp = timestamp - output.arrival_time.append(choices[0].get("arrival_time")) - generated_text += text or "" + output.arrival_time.append(choices[0].get("arrival_time", timestamp)) elif usage := data.get("usage"): output.prompt_tokens = usage.get( "prompt_tokens") @@ -262,8 +278,15 @@ async def async_request_eb_openai_completions( output.error = ( "Never received a valid chunk to calculate TTFT." "This response will be marked as failed!") + output.generated_text = generated_text output.latency = most_recent_timestamp - st + + if output.generated_text == "": + output.success = False + output.error = "No generated text found!" + else: + output.success = True else: output.error = response.reason or "" output.success = False @@ -271,6 +294,8 @@ async def async_request_eb_openai_completions( output.success = False exc_info = sys.exc_info() output.error = "".join(traceback.format_exception(*exc_info)) + + print("final_output:{}".format(output)) if pbar: pbar.update(1) diff --git a/benchmarks/benchmark_dataset.py b/benchmarks/benchmark_dataset.py index 2d8bcca34..59ab4b454 100644 --- a/benchmarks/benchmark_dataset.py +++ b/benchmarks/benchmark_dataset.py @@ -38,7 +38,7 @@ class SampleRequest: """ Represents a single inference request for benchmarking. """ - + no: int prompt: Union[str, Any] history_QA: Union[str, Any] json_data: Optional[dict] @@ -229,6 +229,7 @@ class EBDataset(BenchmarkDataset): **kwargs, ) -> list: samples: list = [] + cnt = 1 for entry in self.data: if len(samples) >= num_requests: break @@ -246,16 +247,17 @@ class EBDataset(BenchmarkDataset): prompt, None) samples.append( SampleRequest( + no=cnt, prompt=prompt, prompt_len=self.prompt_len, history_QA=[], expected_output_len=new_output_len, )) + cnt += 1 self.maybe_oversample_requests(samples, num_requests) return samples - class EBChatDataset(BenchmarkDataset): """ Implements the ShareGPT dataset. Loads data from a JSON file and generates @@ -284,6 +286,7 @@ class EBChatDataset(BenchmarkDataset): **kwargs, ) -> list: samples: list = [] + cnt = 1 for entry in self.data: if len(samples) >= num_requests: break @@ -297,12 +300,14 @@ class EBChatDataset(BenchmarkDataset): prompt, None) samples.append( SampleRequest( + no=cnt, json_data=json_data, prompt=prompt, prompt_len=0, history_QA=history_QA, expected_output_len=new_output_len, )) + cnt += 1 self.maybe_oversample_requests(samples, num_requests) return samples diff --git a/benchmarks/benchmark_serving.py b/benchmarks/benchmark_serving.py index 924f96ad4..e015117b3 100644 --- a/benchmarks/benchmark_serving.py +++ b/benchmarks/benchmark_serving.py @@ -182,6 +182,7 @@ def calculate_metrics( # len(outputs[i].itl) since multiple output tokens may be # bundled together # Note : this may inflate the output token count slightly + continue actual_output_lens.append(output_len) input_lens.append(outputs[i].prompt_len) @@ -209,6 +210,8 @@ def calculate_metrics( if len(outputs[i].arrival_time) > 2: s_decodes.append((outputs[i].output_tokens - 1) / (outputs[i].arrival_time[-1] - outputs[i].arrival_time[1])) + else: + print("len(outputs[i].arrival_time) <= 2") completed += 1 else: actual_output_lens.append(0) @@ -341,15 +344,16 @@ async def benchmark( raise ValueError(f"Unknown backend: {backend}") print("Starting initial single prompt test run...") - test_prompt, test_output_len = \ + test_prompt, test_output_len, test_no = \ input_requests[0].prompt, \ - input_requests[0].expected_output_len + input_requests[0].expected_output_len, input_requests[0].no test_history_QA = input_requests[0].history_QA test_input = RequestFuncInput( model=model_id, model_name=model_name, prompt=test_prompt, + no=test_no, prompt_len=0, history_QA=test_history_QA, hyper_parameters=hyper_parameters, @@ -384,6 +388,7 @@ async def benchmark( profile_input = RequestFuncInput(model=model_id, model_name=model_name, prompt=test_prompt, + no=test_no, api_url=base_url + "/start_profile", output_len=test_output_len, logprobs=logprobs, @@ -422,7 +427,7 @@ async def benchmark( benchmark_start_time = time.perf_counter() tasks: list[asyncio.Task] = [] async for request in get_request(input_requests, request_rate, burstiness): - prompt, output_len = request.prompt, request.expected_output_len + prompt, output_len, no = request.prompt, request.expected_output_len, request.no history_QA = request.history_QA req_model_id, req_model_name = model_id, model_name @@ -433,6 +438,7 @@ async def benchmark( request_func_input = RequestFuncInput(model=req_model_id, model_name=req_model_name, prompt=prompt, + no=no, prompt_len=0, history_QA=history_QA, hyper_parameters=hyper_parameters, @@ -452,6 +458,7 @@ async def benchmark( profile_input = RequestFuncInput( model=model_id, prompt=test_prompt, + no=test_no, api_url=base_url + "/stop_profile", output_len=test_output_len, logprobs=logprobs, @@ -464,6 +471,8 @@ async def benchmark( pbar.close() benchmark_duration = time.perf_counter() - benchmark_start_time + print("benchmark_duration:", benchmark_duration) + metrics, actual_output_lens = calculate_metrics( input_requests=input_requests, @@ -594,6 +603,155 @@ async def benchmark( return result +def benchmark_metrics( + benchmark_duration: float, + result_file: str, + selected_percentiles: list[float], + selected_percentile_metrics: list[str], + goodput_config_dict: dict[str, float], +): + """Benchmark metrics statistics,generate benchmark result""" + outputs = [] + case_no_list = [] + with open(result_file) as f: + for line in f.readlines(): + if "RequestFuncOutput" in line: + start = line.find("RequestFuncOutput") + end = line.rfind(")") + para_str = line[start:end + 1] + + output = eval(para_str) + outputs.append(output) + + input_requests = [[]] * len(outputs) + goodput_config_dict = check_goodput_args(args) + + metrics, actual_output_lens = calculate_metrics( + input_requests=input_requests, + outputs=outputs, + dur_s=benchmark_duration, + selected_percentiles=selected_percentiles, + goodput_config_dict=goodput_config_dict, + ) + + print("{s:{c}^{n}}".format(s=' Serving Benchmark Result ', n=50, c='=')) + print("{:<40} {:<10}".format("Successful requests:", metrics.completed)) + print("{:<40} {:<10.2f}".format("Benchmark duration (s):", + benchmark_duration)) + print("{:<40} {:<10}".format("Total input tokens:", metrics.total_input)) + print("{:<40} {:<10}".format("Total generated tokens:", + metrics.total_output)) + print("{:<40} {:<10.2f}".format("Request throughput (req/s):", + metrics.request_throughput)) + if goodput_config_dict: + print("{:<40} {:<10.2f}".format("Request goodput (req/s):", + metrics.request_goodput)) + print("{:<40} {:<10.2f}".format("Output token throughput (tok/s):", + metrics.output_throughput)) + print("{:<40} {:<10.2f}".format("Total Token throughput (tok/s):", + metrics.total_token_throughput)) + + result = { + "duration": benchmark_duration, + "completed": metrics.completed, + "total_input_tokens": metrics.total_input, + "total_output_tokens": metrics.total_output, + "request_throughput": metrics.request_throughput, + "request_goodput:": + metrics.request_goodput if goodput_config_dict else None, + "output_throughput": metrics.output_throughput, + "total_token_throughput": metrics.total_token_throughput, + "input_lens": [output.prompt_len for output in outputs], + "output_lens": actual_output_lens, + "ttfts": [output.ttft for output in outputs], + "itls": [output.itl for output in outputs], + "input_texts": ["" for input in input_requests], + "generated_texts": [output.generated_text for output in outputs], + "errors": [output.error for output in outputs], + } + + def process_one_metric( + # E.g., "ttft" + metric_attribute_name: str, + # E.g., "TTFT" + metric_name: str, + # E.g., "Time to First Token" + metric_header: str, + ): + # This function prints and adds statistics of the specified + # metric. + if metric_attribute_name not in selected_percentile_metrics: + return + print("{s:{c}^{n}}".format(s=metric_header, n=50, c='-')) + print("{:<40} {:<10.2f}".format( + f"Mean {metric_name} (ms):", + getattr(metrics, f"mean_{metric_attribute_name}_ms"))) + print("{:<40} {:<10.2f}".format( + f"Median {metric_name} (ms):", + getattr(metrics, f"median_{metric_attribute_name}_ms"))) + result[f"mean_{metric_attribute_name}_ms"] = getattr( + metrics, f"mean_{metric_attribute_name}_ms") + result[f"median_{metric_attribute_name}_ms"] = getattr( + metrics, f"median_{metric_attribute_name}_ms") + result[f"std_{metric_attribute_name}_ms"] = getattr( + metrics, f"std_{metric_attribute_name}_ms") + for p, value in getattr(metrics, + f"percentiles_{metric_attribute_name}_ms"): + p_word = str(int(p)) if int(p) == p else str(p) + print("{:<40} {:<10.2f}".format(f"P{p_word} {metric_name} (ms):", + value)) + result[f"p{p_word}_{metric_attribute_name}_ms"] = value + + def process_one_length( + # E.g., "ttft" + metric_attribute_name: str, + # E.g., "TTFT" + metric_name: str, + # E.g., "Time to First Token" + metric_header: str, + ): + # This function prints and adds statistics of the specified + # metric. + if metric_attribute_name not in selected_percentile_metrics: + return + print("{s:{c}^{n}}".format(s=metric_header, n=50, c='-')) + print("{:<40} {:<10.2f}".format( + f"Mean {metric_name}:", + getattr(metrics, f"mean_{metric_attribute_name}"))) + print("{:<40} {:<10.2f}".format( + f"Median {metric_name}:", + getattr(metrics, f"median_{metric_attribute_name}"))) + result[f"mean_{metric_attribute_name}"] = getattr( + metrics, f"mean_{metric_attribute_name}") + result[f"median_{metric_attribute_name}"] = getattr( + metrics, f"median_{metric_attribute_name}") + result[f"std_{metric_attribute_name}"] = getattr( + metrics, f"std_{metric_attribute_name}") + for p, value in getattr(metrics, + f"percentiles_{metric_attribute_name}"): + p_word = str(int(p)) if int(p) == p else str(p) + print("{:<40} {:<10.2f}".format(f"P{p_word} {metric_name}:", + value)) + result[f"p{p_word}_{metric_attribute_name}"] = value + + process_one_length("s_decode", "Decode", "解码速度(tok/s)") + process_one_metric("ttft", "TTFT", "Time to First Token") + process_one_metric("s_ttft", "S_TTFT", "Infer Time to First Token") + process_one_metric("tpot", "TPOT", + "Time per Output Token (excl. 1st token)") + process_one_metric("itl", "ITL", "Inter-token Latency") + process_one_metric("s_itl", "S_ITL", "Infer Inter-token Latency") + process_one_metric("e2el", "E2EL", "End-to-end Latency") + process_one_metric("s_e2el", "S_E2EL", "Infer End-to-end Latency") + process_one_length("input_len", "Input Length", "Input Length") + process_one_length("s_input_len", "Input Length", "Infer Input Length") + process_one_length("output_len", "Output Length", "Output Length") + + print("=" * 50) + + return result + + def check_goodput_args(args): """Check whether the given argument has valid goodput configuration or not""" # Check and parse goodput arguments @@ -759,6 +917,16 @@ def main(args: argparse.Namespace): lora_modules=args.lora_modules, extra_body=sampling_params, )) + + # benchmark_result = benchmark_metrics( + # benchmark_duration=3600, + # result_file="your result file", + # selected_percentile_metrics=args.percentile_metrics.split(","), + # selected_percentiles=[ + # float(p) for p in args.metric_percentiles.split(",") + # ], + # goodput_config_dict=goodput_config_dict, + # ) # Save config and results to json if args.save_result: