diff --git a/fastdeploy/engine/request.py b/fastdeploy/engine/request.py index b9a141bc9..d871de533 100644 --- a/fastdeploy/engine/request.py +++ b/fastdeploy/engine/request.py @@ -30,7 +30,12 @@ from fastdeploy.engine.pooling_params import PoolingParams from fastdeploy.engine.sampling_params import SamplingParams from fastdeploy.entrypoints.openai.protocol import ToolCall from fastdeploy.utils import data_processor_logger -from fastdeploy.worker.output import LogprobsLists, PromptLogprobs, SampleLogprobs +from fastdeploy.worker.output import ( + LogprobsLists, + PromptLogprobs, + SampleLogprobs, + SpeculateMetrics, +) class RequestStatus(Enum): @@ -394,6 +399,7 @@ class CompletionOutput: reasoning_content: Optional[str] = None reasoning_token_num: Optional[int] = 0 tool_calls: Optional[ToolCall] = None + speculate_metrics: Optional[SpeculateMetrics] = None def to_dict(self): """ @@ -514,6 +520,8 @@ class RequestMetrics: llm_engine_send_req_to_engine_timestamp: Optional[float] = None llm_engine_recv_latest_token_timestamp: Optional[float] = None + speculate_metrics: Optional[SpeculateMetrics] = None + def __post_init__(self): if self.arrival_time is None: self.arrival_time = time.time() @@ -664,6 +672,8 @@ class RequestOutput: self.outputs.draft_top_logprobs.sampled_token_ranks.extend( next_output.outputs.draft_top_logprobs.sampled_token_ranks ) + if next_output.metrics.speculate_metrics is not None: + self.outputs.speculate_metrics = next_output.metrics.speculate_metrics def __repr__(self) -> str: return ( diff --git a/fastdeploy/entrypoints/openai/protocol.py b/fastdeploy/entrypoints/openai/protocol.py index 199dc4487..b103b4e3e 100644 --- a/fastdeploy/entrypoints/openai/protocol.py +++ b/fastdeploy/entrypoints/openai/protocol.py @@ -31,7 +31,7 @@ from pydantic import ( ) from fastdeploy.engine.pooling_params import PoolingParams -from fastdeploy.worker.output import PromptLogprobs +from fastdeploy.worker.output import PromptLogprobs, SpeculateMetrics class InvalidParameterException(Exception): @@ -230,6 +230,7 @@ class ChatCompletionResponseChoice(BaseModel): draft_logprobs: Optional[LogProbs] = None prompt_logprobs: Optional[PromptLogprobs] = None finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop"]] + speculate_metrics: Optional[SpeculateMetrics] = None class ChatCompletionResponse(BaseModel): @@ -295,6 +296,7 @@ class ChatCompletionResponseStreamChoice(BaseModel): prompt_logprobs: Optional[PromptLogprobs] = None finish_reason: Optional[Literal["stop", "length", "tool_calls"]] = None arrival_time: Optional[float] = None + speculate_metrics: Optional[SpeculateMetrics] = None class ChatCompletionStreamResponse(BaseModel): @@ -330,6 +332,7 @@ class CompletionResponseChoice(BaseModel): reasoning_content: Optional[str] = None finish_reason: Optional[Literal["stop", "length", "tool_calls"]] tool_calls: Optional[List[DeltaToolCall | ToolCall]] = None + speculate_metrics: Optional[SpeculateMetrics] = None class CompletionResponse(BaseModel): @@ -375,6 +378,7 @@ class CompletionResponseStreamChoice(BaseModel): reasoning_content: Optional[str] = None finish_reason: Optional[Literal["stop", "length", "tool_calls"]] = None tool_calls: Optional[List[DeltaToolCall | ToolCall]] = None + speculate_metrics: Optional[SpeculateMetrics] = None class CompletionStreamResponse(BaseModel): diff --git a/fastdeploy/entrypoints/openai/serving_chat.py b/fastdeploy/entrypoints/openai/serving_chat.py index 50876b2cf..4ef89f881 100644 --- a/fastdeploy/entrypoints/openai/serving_chat.py +++ b/fastdeploy/entrypoints/openai/serving_chat.py @@ -57,6 +57,7 @@ from fastdeploy.worker.output import ( LogprobsLists, LogprobsTensors, PromptLogprobs, + SpeculateMetrics, ) NONES = itertools.repeat(None) @@ -384,6 +385,8 @@ class OpenAIServingChat: request.include_logprobs_decode_token, ) + output_speculate_metrics = res["metrics"].get("speculate_metrics", None) + delta_message = DeltaMessage( reasoning_content="", prompt_token_ids=None, @@ -415,6 +418,7 @@ class OpenAIServingChat: logprobs=logprobs_res, draft_logprobs=draft_logprobs_res, arrival_time=arrival_time, + speculate_metrics=output_speculate_metrics, ) if res["finished"]: num_choices -= 1 @@ -537,6 +541,7 @@ class OpenAIServingChat: decoder_base_url=self.tokenizer_base_url, ) prompt_logprobs_res_list = [[] for _ in range(num_choices)] + speculate_metrics = [None for _ in range(num_choices)] choices = [] while num_choices > 0: if self.engine_client.check_model_weight_status(): @@ -613,6 +618,7 @@ class OpenAIServingChat: ) if prompt_logprobs_res: prompt_logprobs_res_list[idx].extend(clamp_prompt_logprobs(prompt_logprobs_res)) + speculate_metrics[idx] = data["metrics"].get("speculate_metrics", None) if data["finished"]: num_choices -= 1 reasoning_num_tokens[idx] = data["outputs"].get("reasoning_token_num", 0) @@ -635,6 +641,7 @@ class OpenAIServingChat: response_processor=response_processor, prompt_logprobs_res_list=prompt_logprobs_res_list, max_tokens=max_tokens, + speculate_metrics=speculate_metrics[idx], ) choices.append(choice) finally: @@ -688,6 +695,7 @@ class OpenAIServingChat: prompt_logprobs_res_list: list, response_processor: ChatResponseProcessor, max_tokens: int, + speculate_metrics: SpeculateMetrics | None, ) -> ChatCompletionResponseChoice: idx = int(data["request_id"].split("_")[-1]) output = data["outputs"] @@ -745,6 +753,7 @@ class OpenAIServingChat: draft_logprobs=draft_logprobs_full_res, prompt_logprobs=prompt_logprobs_full_res, finish_reason=finish_reason, + speculate_metrics=speculate_metrics, ) def _create_chat_logprobs( diff --git a/fastdeploy/entrypoints/openai/serving_completion.py b/fastdeploy/entrypoints/openai/serving_completion.py index c8a9f1efe..0f0ee3b25 100644 --- a/fastdeploy/entrypoints/openai/serving_completion.py +++ b/fastdeploy/entrypoints/openai/serving_completion.py @@ -261,6 +261,7 @@ class OpenAIServingCompletion: aggregated_token_ids = [[] for _ in range(num_choices)] aggregated_prompt_logprobs_tensors = [None] * num_choices completion_batched_token_ids = [[] for _ in range(num_choices)] + aggregated_speculate_metrics = [None] * num_choices current_waiting_time = 0 while num_choices > 0: if self.engine_client.check_model_weight_status(): @@ -315,12 +316,18 @@ class OpenAIServingCompletion: ) output_tokens[rid] += len(data["outputs"]["token_ids"]) completion_batched_token_ids[rid].extend(data["outputs"]["token_ids"]) + + output_speculate_metrics = data["metrics"].get("speculate_metrics", None) + if output_speculate_metrics is not None: + aggregated_speculate_metrics[rid] = output_speculate_metrics + if data.get("finished", False): data["output_token_ids"] = output_tokens[rid] data["outputs"]["top_logprobs"] = aggregated_top_logprobs[rid] data["outputs"]["draft_top_logprobs"] = aggregated_draft_top_logprobs[rid] data["outputs"]["token_ids"] = aggregated_token_ids[rid] data["prompt_logprobs_tensors"] = aggregated_prompt_logprobs_tensors[rid] + data["speculate_metrics"] = aggregated_speculate_metrics[rid] valid_results[rid] = data num_choices -= 1 break @@ -512,6 +519,7 @@ class OpenAIServingCompletion: output_tokens[idx] += output.get("num_image_tokens") num_image_tokens[idx] += output.get("num_image_tokens") reasoning_tokens[idx] += output.get("reasoning_token_num", 0) + output_speculate_metrics = res["metrics"].get("speculate_metrics", None) delta_message = CompletionResponseStreamChoice( index=idx, text=output["text"], @@ -524,6 +532,7 @@ class OpenAIServingCompletion: logprobs=logprobs_res, prompt_logprobs=clamp_prompt_logprobs(prompt_logprobs_res), draft_logprobs=draft_logprobs_res, + speculate_metrics=output_speculate_metrics, ) if not res["finished"] and "delta_message" in output: delta_message_output = output["delta_message"] @@ -686,6 +695,7 @@ class OpenAIServingCompletion: draft_logprobs=aggregated_draft_logprobs, prompt_logprobs=clamp_prompt_logprobs(prompt_logprobs_res), finish_reason=finish_reason, + speculate_metrics=final_res["metrics"].get("speculate_metrics", None), ) choices.append(choice_data) diff --git a/fastdeploy/metrics/metrics.py b/fastdeploy/metrics/metrics.py index ec89e8383..4da49a966 100644 --- a/fastdeploy/metrics/metrics.py +++ b/fastdeploy/metrics/metrics.py @@ -143,9 +143,9 @@ class MetricsManager: request_success_total: "Counter" spec_decode_draft_acceptance_rate: "Gauge" spec_decode_efficiency: "Gauge" - spec_decode_num_accepted_tokens_total: "Counter" + spec_decode_num_accepted_tokens_total: "Gauge" spec_decode_num_draft_tokens_total: "Counter" - spec_decode_num_emitted_tokens_total: "Counter" + spec_decode_num_emitted_tokens_total: "Gauge" spec_decode_draft_single_head_acceptance_rate: "list[Gauge]" # for YIYAN Adapter @@ -598,13 +598,13 @@ class MetricsManager: "kwargs": {}, }, "spec_decode_num_accepted_tokens_total": { - "type": Counter, + "type": Gauge, "name": "fastdeploy:spec_decode_num_accepted_tokens_total", "description": "Total number of tokens accepted by the scoring model and verification program", "kwargs": {}, }, "spec_decode_num_emitted_tokens_total": { - "type": Counter, + "type": Gauge, "name": "fastdeploy:spec_decode_num_emitted_tokens_total", "description": "Total number of tokens output by the entire system", "kwargs": {}, diff --git a/fastdeploy/output/token_processor.py b/fastdeploy/output/token_processor.py index 6286451d9..d7afd1947 100644 --- a/fastdeploy/output/token_processor.py +++ b/fastdeploy/output/token_processor.py @@ -35,6 +35,7 @@ from fastdeploy.engine.request import ( Request, RequestMetrics, RequestOutput, + SpeculateMetrics, ) from fastdeploy.inter_communicator import ZmqIpcServer from fastdeploy.metrics.metrics import main_process_metrics @@ -112,16 +113,13 @@ class TokenProcessor: self.num_accepted_tokens = 0 self.num_emitted_tokens = 0 self.max_num_emitted_tokens = 0 - self.num_rest_requests_per_head = [ - 0, - ] * MAX_DRAFT_TOKENS - self.num_accept_requests_per_head = [ - 0, - ] * MAX_DRAFT_TOKENS self.executor = ThreadPoolExecutor(max_workers=1) self.prefill_result_status = dict() self._finalizer = weakref.finalize(self, self._cleanup_resources) self._batch_result_buffer = None + self.total_step_per_request = {} + self.accept_token_num_per_head_per_request = {} + self.accept_token_num_per_head = [0] * MAX_DRAFT_TOKENS def _cleanup_resources(self): """Cleaning up shared memory resources""" @@ -504,7 +502,7 @@ class TokenProcessor: if task_id in self.tokens_counter: del self.tokens_counter[task_id] - def _compute_speculative_status(self): + def _compute_speculative_status(self, result: RequestOutput): # TODO(liuzichang): Supplement more statistics interval = 1 if self.speculative_stats_step % interval == 0: @@ -517,13 +515,11 @@ class TokenProcessor: if self.cfg.speculative_config.method in ["mtp"]: single_head_acceptance_rates = [] - for head in range(self.cfg.speculative_config.num_speculative_tokens): - if self.num_rest_requests_per_head[head] != 0: + for i in range(1, self.cfg.speculative_config.num_speculative_tokens + 1): + if self.accept_token_num_per_head[i - 1] != 0: single_head_acceptance_rates.append( - self.num_accept_requests_per_head[head] / self.num_rest_requests_per_head[head] + self.accept_token_num_per_head[i] / self.accept_token_num_per_head[i - 1] ) - else: - single_head_acceptance_rates.append(0) spec_logger.info(f" Single head accept ratio: {single_head_acceptance_rates}") if self.number_of_output_tokens > 1000000: @@ -531,6 +527,43 @@ class TokenProcessor: self.total_step = 0 self.speculative_stats_step += 1 + # For result + req_id = result.request_id + accept_num_list = self.accept_token_num_per_head_per_request[req_id] + req_total_step = self.total_step_per_request[req_id] + req_total_draft_tokens = req_total_step * (self.cfg.speculative_config.num_speculative_tokens + 1) + req_accepted_tokens = sum(accept_num_list) + req_rejected_tokens = req_total_draft_tokens - req_accepted_tokens + req_accept_ratio = 1 - req_total_step / req_accepted_tokens + req_avg_accept_length = req_accepted_tokens / req_total_step + + accept_ratio_per_head = [] + for i in range(1, len(accept_num_list)): + if accept_num_list[i - 1] != 0: + accept_ratio_per_head.append(accept_num_list[i] / accept_num_list[i - 1]) + else: + accept_ratio_per_head.append(0) + + result.metrics.speculate_metrics = SpeculateMetrics( + accepted_tokens=req_accepted_tokens, + rejected_tokens=req_rejected_tokens, + accept_ratio=req_accept_ratio, + average_accept_length=req_avg_accept_length, + accept_ratio_per_head=accept_ratio_per_head[: self.cfg.speculative_config.num_speculative_tokens], + ) + + # Log + spec_logger.debug( + f"req_id: {result.request_id}, total_step: {req_total_step}, " + f"accept_ratio: {accept_ratio}, average_accept_lenght: {req_avg_accept_length}," + f"accepted_tokens: {req_accepted_tokens}, rejected_tokens: {req_rejected_tokens}" + f"accept_ratio_per_head: {accept_ratio_per_head}" + ) + + # Clear request record + self.accept_token_num_per_head_per_request.pop(req_id) + self.total_step_per_request.pop(req_id) + def _process_batch_draft_tokens(self, mtype, batch, accept_num, tokens, scores, ranks): """ Process batch draft tokens and generate corresponding request outputs @@ -618,7 +651,6 @@ class TokenProcessor: else: batch = self.output_tokens[1] accept_num = tokens[2 : batch + 2] - self._record_speculative_decoding_metrics(accept_num) elif self.use_logprobs: batch = self.output_tokens[1, 0] tokens = tokens[2 : batch * (K + 1) + 2].reshape([batch, K + 1])[:, : (K + 1)] @@ -642,6 +674,7 @@ class TokenProcessor: is_decode = task.disaggregate_info is not None and self.cfg.scheduler_config.splitwise_role == "decode" if self.cfg.speculative_config.method: + self._record_speculative_decoding_accept_num_per_request(task_id, accept_num[i]) if accept_num[i] == -3: recovery_stop = True if recovery_stop: @@ -776,7 +809,7 @@ class TokenProcessor: ) llm_logger.info(f"{self.resource_manager.info()}") if self.cfg.speculative_config.method: - self._compute_speculative_status() + self._compute_speculative_status(result) if not is_prefill: self._record_completion_metrics(task, current_time) self._recycle_resources(task_id, i, task, result, is_prefill) @@ -785,6 +818,8 @@ class TokenProcessor: llm_logger.debug(f"get response from infer: {result}") batch_result.append(result) + if self.cfg.speculative_config.method: + self._record_speculative_decoding_metrics(accept_num) self.postprocess(batch_result, mtype) def _record_metrics(self, task, current_time, token_ids): @@ -828,15 +863,13 @@ class TokenProcessor: ) real_accept_num = [x for x in accept_num if x > 0] - num_accepted_tokens = sum([x - 1 for x in real_accept_num]) - self.num_accepted_tokens += num_accepted_tokens - num_emitted_tokens = sum(real_accept_num) - if num_emitted_tokens == 0: + self.num_accepted_tokens = sum(self.accept_token_num_per_head[1:]) + self.num_emitted_tokens = sum(self.accept_token_num_per_head) + if self.num_emitted_tokens == 0: return - self.num_emitted_tokens += num_emitted_tokens - main_process_metrics.spec_decode_num_accepted_tokens_total.inc(num_accepted_tokens) - main_process_metrics.spec_decode_num_emitted_tokens_total.inc(num_emitted_tokens) + main_process_metrics.spec_decode_num_accepted_tokens_total.set(self.num_accepted_tokens) + main_process_metrics.spec_decode_num_emitted_tokens_total.set(self.num_emitted_tokens) if self.cfg.speculative_config.method in ["ngram"]: main_process_metrics.spec_decode_draft_acceptance_rate.set( @@ -857,25 +890,26 @@ class TokenProcessor: main_process_metrics.spec_decode_efficiency.set(self.num_emitted_tokens / self.max_num_emitted_tokens) main_process_metrics.spec_decode_num_draft_tokens_total.inc(num_draft_tokens) - num_rest_requests = len(real_accept_num) - for head in range(self.cfg.speculative_config.num_speculative_tokens): - num_accept_requests = len([x for x in real_accept_num if x >= head + 2]) - # Accumulate the number of requests for each head - self.num_accept_requests_per_head[head] += num_accept_requests - self.num_rest_requests_per_head[head] += num_rest_requests - # Update the rest requests for each head - num_rest_requests = num_accept_requests - # Calculate the acceptance rate for each head - if self.num_rest_requests_per_head[head] != 0: + for i in range(1, self.cfg.speculative_config.num_speculative_tokens + 1): + if self.accept_token_num_per_head[i - 1] != 0: single_head_acceptance_rate = ( - self.num_accept_requests_per_head[head] / self.num_rest_requests_per_head[head] + self.accept_token_num_per_head[i] / self.accept_token_num_per_head[i - 1] ) - else: - single_head_acceptance_rate = 0 - main_process_metrics.spec_decode_draft_single_head_acceptance_rate[head].set( + main_process_metrics.spec_decode_draft_single_head_acceptance_rate[i - 1].set( single_head_acceptance_rate ) + def _record_speculative_decoding_accept_num_per_request(self, req_id, accept_num): + if req_id not in self.total_step_per_request: + self.total_step_per_request[req_id] = 0 + if req_id not in self.accept_token_num_per_head_per_request: + self.accept_token_num_per_head_per_request[req_id] = [0] * MAX_DRAFT_TOKENS + + self.total_step_per_request[req_id] += 1 + for i in range(accept_num): + self.accept_token_num_per_head_per_request[req_id][i] += 1 + self.accept_token_num_per_head[i] += 1 + def clear_data(self): if envs.ENABLE_V1_KVCACHE_SCHEDULER: self.resource_manager.clear_data() diff --git a/fastdeploy/worker/output.py b/fastdeploy/worker/output.py index bd7e7ce1d..13d822b09 100644 --- a/fastdeploy/worker/output.py +++ b/fastdeploy/worker/output.py @@ -128,6 +128,38 @@ class LogprobsTensors(NamedTuple): PromptLogprobs = LogprobsTensors | list[dict[int, Logprob] | None] +@dataclass +class SpeculateMetrics: + """ + Speculative decoding metrics + """ + + """ + The number of accepted tokens in the current request + """ + accepted_tokens: int + + """ + The number of rejected tokens in the current request + """ + rejected_tokens: int + + """ + The acceptance rate of the current request + """ + accept_ratio: float + + """ + Average number of accepted tokens per step for the current request + """ + average_accept_length: float + + """ + Average acceptance rate of each head in the current request + """ + accept_ratio_per_head: list[float] + + @dataclass class SamplerOutput: """ """ diff --git a/tests/entrypoints/openai/test_completion_echo.py b/tests/entrypoints/openai/test_completion_echo.py index 679f6d8ec..087d159d7 100644 --- a/tests/entrypoints/openai/test_completion_echo.py +++ b/tests/entrypoints/openai/test_completion_echo.py @@ -46,6 +46,7 @@ class TestCompletionEcho(unittest.IsolatedAsyncioTestCase): "finished": True, }, "output_token_ids": 3, + "metrics": {}, } self.mock_engine.generate.return_value = [mock_output] @@ -80,6 +81,7 @@ class TestCompletionEcho(unittest.IsolatedAsyncioTestCase): "finished": True, }, "output_token_ids": 3, + "metrics": {}, } self.mock_engine.generate.return_value = [mock_output] @@ -109,10 +111,12 @@ class TestCompletionEcho(unittest.IsolatedAsyncioTestCase): { "outputs": {"text": " response1", "token_ids": [1, 2], "top_logprobs": None, "finished": True}, "output_token_ids": 2, + "metrics": {}, }, { "outputs": {"text": " response2", "token_ids": [3, 4], "top_logprobs": None, "finished": True}, "output_token_ids": 2, + "metrics": {}, }, ] self.mock_engine.generate.return_value = mock_outputs @@ -146,10 +150,12 @@ class TestCompletionEcho(unittest.IsolatedAsyncioTestCase): { "outputs": {"text": " response1", "token_ids": [1, 2], "top_logprobs": None, "finished": True}, "output_token_ids": 2, + "metrics": {}, }, { "outputs": {"text": " response2", "token_ids": [3, 4], "top_logprobs": None, "finished": True}, "output_token_ids": 2, + "metrics": {}, }, ] self.mock_engine.generate.return_value = mock_outputs diff --git a/tests/entrypoints/openai/test_max_streaming_tokens.py b/tests/entrypoints/openai/test_max_streaming_tokens.py index a7a45d456..6b25eaffd 100644 --- a/tests/entrypoints/openai/test_max_streaming_tokens.py +++ b/tests/entrypoints/openai/test_max_streaming_tokens.py @@ -321,6 +321,7 @@ class TestMaxStreamingResponseTokens(IsolatedAsyncioTestCase): ], }, "finished": True, + "metrics": {}, }, { "request_id": "test_request_id_1", @@ -334,6 +335,7 @@ class TestMaxStreamingResponseTokens(IsolatedAsyncioTestCase): ], }, "finished": True, + "metrics": {}, }, ] @@ -493,6 +495,7 @@ class TestMaxStreamingResponseTokens(IsolatedAsyncioTestCase): prompt_logprobs_res_list=prompt_logprobs_res_list, response_processor=mock_response_processor, max_tokens=max_tokens_list[idx], + speculate_metrics=None, ) expected = case["expected"] diff --git a/tests/entrypoints/openai/test_serving_completion.py b/tests/entrypoints/openai/test_serving_completion.py index 680d775bd..761213d1d 100644 --- a/tests/entrypoints/openai/test_serving_completion.py +++ b/tests/entrypoints/openai/test_serving_completion.py @@ -129,6 +129,7 @@ class TestOpenAIServingCompletion(unittest.IsolatedAsyncioTestCase): "reasoning_token_num": 10, }, "output_token_ids": 3, + "metrics": {}, }, { "outputs": { @@ -141,6 +142,7 @@ class TestOpenAIServingCompletion(unittest.IsolatedAsyncioTestCase): "reasoning_token_num": 20, }, "output_token_ids": 3, + "metrics": {}, }, ] diff --git a/tests/output/test_process_batch_output.py b/tests/output/test_process_batch_output.py index 9a1d06db7..ee25341b6 100644 --- a/tests/output/test_process_batch_output.py +++ b/tests/output/test_process_batch_output.py @@ -142,13 +142,10 @@ class TestTokenProcessorProcessBatchOutput(unittest.TestCase): processor.num_accepted_tokens = 0 processor.num_emitted_tokens = 0 processor.max_num_emitted_tokens = 0 - processor.num_rest_requests_per_head = [ - 0, - ] * MAX_DRAFT_TOKENS - processor.num_accept_requests_per_head = [ - 0, - ] * MAX_DRAFT_TOKENS processor.speculative_stats_step = 0 + processor.total_step_per_request = {} + processor.accept_token_num_per_head_per_request = {} + processor.accept_token_num_per_head = [0] * MAX_DRAFT_TOKENS # processor._recycle_resources = Mock()