[Feature] Support for request-level speculative decoding metrics monitoring. (#5518)

* support spec metrics monitor per request

* fix bug

* remove debug log

* fix ut bugs
This commit is contained in:
GoldPancake
2025-12-12 12:22:18 +08:00
committed by GitHub
parent 3c1f7b85a4
commit 909059c60a
11 changed files with 154 additions and 47 deletions

View File

@@ -30,7 +30,12 @@ from fastdeploy.engine.pooling_params import PoolingParams
from fastdeploy.engine.sampling_params import SamplingParams
from fastdeploy.entrypoints.openai.protocol import ToolCall
from fastdeploy.utils import data_processor_logger
from fastdeploy.worker.output import LogprobsLists, PromptLogprobs, SampleLogprobs
from fastdeploy.worker.output import (
LogprobsLists,
PromptLogprobs,
SampleLogprobs,
SpeculateMetrics,
)
class RequestStatus(Enum):
@@ -394,6 +399,7 @@ class CompletionOutput:
reasoning_content: Optional[str] = None
reasoning_token_num: Optional[int] = 0
tool_calls: Optional[ToolCall] = None
speculate_metrics: Optional[SpeculateMetrics] = None
def to_dict(self):
"""
@@ -514,6 +520,8 @@ class RequestMetrics:
llm_engine_send_req_to_engine_timestamp: Optional[float] = None
llm_engine_recv_latest_token_timestamp: Optional[float] = None
speculate_metrics: Optional[SpeculateMetrics] = None
def __post_init__(self):
if self.arrival_time is None:
self.arrival_time = time.time()
@@ -664,6 +672,8 @@ class RequestOutput:
self.outputs.draft_top_logprobs.sampled_token_ranks.extend(
next_output.outputs.draft_top_logprobs.sampled_token_ranks
)
if next_output.metrics.speculate_metrics is not None:
self.outputs.speculate_metrics = next_output.metrics.speculate_metrics
def __repr__(self) -> str:
return (

View File

@@ -31,7 +31,7 @@ from pydantic import (
)
from fastdeploy.engine.pooling_params import PoolingParams
from fastdeploy.worker.output import PromptLogprobs
from fastdeploy.worker.output import PromptLogprobs, SpeculateMetrics
class InvalidParameterException(Exception):
@@ -230,6 +230,7 @@ class ChatCompletionResponseChoice(BaseModel):
draft_logprobs: Optional[LogProbs] = None
prompt_logprobs: Optional[PromptLogprobs] = None
finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop"]]
speculate_metrics: Optional[SpeculateMetrics] = None
class ChatCompletionResponse(BaseModel):
@@ -295,6 +296,7 @@ class ChatCompletionResponseStreamChoice(BaseModel):
prompt_logprobs: Optional[PromptLogprobs] = None
finish_reason: Optional[Literal["stop", "length", "tool_calls"]] = None
arrival_time: Optional[float] = None
speculate_metrics: Optional[SpeculateMetrics] = None
class ChatCompletionStreamResponse(BaseModel):
@@ -330,6 +332,7 @@ class CompletionResponseChoice(BaseModel):
reasoning_content: Optional[str] = None
finish_reason: Optional[Literal["stop", "length", "tool_calls"]]
tool_calls: Optional[List[DeltaToolCall | ToolCall]] = None
speculate_metrics: Optional[SpeculateMetrics] = None
class CompletionResponse(BaseModel):
@@ -375,6 +378,7 @@ class CompletionResponseStreamChoice(BaseModel):
reasoning_content: Optional[str] = None
finish_reason: Optional[Literal["stop", "length", "tool_calls"]] = None
tool_calls: Optional[List[DeltaToolCall | ToolCall]] = None
speculate_metrics: Optional[SpeculateMetrics] = None
class CompletionStreamResponse(BaseModel):

View File

@@ -57,6 +57,7 @@ from fastdeploy.worker.output import (
LogprobsLists,
LogprobsTensors,
PromptLogprobs,
SpeculateMetrics,
)
NONES = itertools.repeat(None)
@@ -384,6 +385,8 @@ class OpenAIServingChat:
request.include_logprobs_decode_token,
)
output_speculate_metrics = res["metrics"].get("speculate_metrics", None)
delta_message = DeltaMessage(
reasoning_content="",
prompt_token_ids=None,
@@ -415,6 +418,7 @@ class OpenAIServingChat:
logprobs=logprobs_res,
draft_logprobs=draft_logprobs_res,
arrival_time=arrival_time,
speculate_metrics=output_speculate_metrics,
)
if res["finished"]:
num_choices -= 1
@@ -537,6 +541,7 @@ class OpenAIServingChat:
decoder_base_url=self.tokenizer_base_url,
)
prompt_logprobs_res_list = [[] for _ in range(num_choices)]
speculate_metrics = [None for _ in range(num_choices)]
choices = []
while num_choices > 0:
if self.engine_client.check_model_weight_status():
@@ -613,6 +618,7 @@ class OpenAIServingChat:
)
if prompt_logprobs_res:
prompt_logprobs_res_list[idx].extend(clamp_prompt_logprobs(prompt_logprobs_res))
speculate_metrics[idx] = data["metrics"].get("speculate_metrics", None)
if data["finished"]:
num_choices -= 1
reasoning_num_tokens[idx] = data["outputs"].get("reasoning_token_num", 0)
@@ -635,6 +641,7 @@ class OpenAIServingChat:
response_processor=response_processor,
prompt_logprobs_res_list=prompt_logprobs_res_list,
max_tokens=max_tokens,
speculate_metrics=speculate_metrics[idx],
)
choices.append(choice)
finally:
@@ -688,6 +695,7 @@ class OpenAIServingChat:
prompt_logprobs_res_list: list,
response_processor: ChatResponseProcessor,
max_tokens: int,
speculate_metrics: SpeculateMetrics | None,
) -> ChatCompletionResponseChoice:
idx = int(data["request_id"].split("_")[-1])
output = data["outputs"]
@@ -745,6 +753,7 @@ class OpenAIServingChat:
draft_logprobs=draft_logprobs_full_res,
prompt_logprobs=prompt_logprobs_full_res,
finish_reason=finish_reason,
speculate_metrics=speculate_metrics,
)
def _create_chat_logprobs(

View File

@@ -261,6 +261,7 @@ class OpenAIServingCompletion:
aggregated_token_ids = [[] for _ in range(num_choices)]
aggregated_prompt_logprobs_tensors = [None] * num_choices
completion_batched_token_ids = [[] for _ in range(num_choices)]
aggregated_speculate_metrics = [None] * num_choices
current_waiting_time = 0
while num_choices > 0:
if self.engine_client.check_model_weight_status():
@@ -315,12 +316,18 @@ class OpenAIServingCompletion:
)
output_tokens[rid] += len(data["outputs"]["token_ids"])
completion_batched_token_ids[rid].extend(data["outputs"]["token_ids"])
output_speculate_metrics = data["metrics"].get("speculate_metrics", None)
if output_speculate_metrics is not None:
aggregated_speculate_metrics[rid] = output_speculate_metrics
if data.get("finished", False):
data["output_token_ids"] = output_tokens[rid]
data["outputs"]["top_logprobs"] = aggregated_top_logprobs[rid]
data["outputs"]["draft_top_logprobs"] = aggregated_draft_top_logprobs[rid]
data["outputs"]["token_ids"] = aggregated_token_ids[rid]
data["prompt_logprobs_tensors"] = aggregated_prompt_logprobs_tensors[rid]
data["speculate_metrics"] = aggregated_speculate_metrics[rid]
valid_results[rid] = data
num_choices -= 1
break
@@ -512,6 +519,7 @@ class OpenAIServingCompletion:
output_tokens[idx] += output.get("num_image_tokens")
num_image_tokens[idx] += output.get("num_image_tokens")
reasoning_tokens[idx] += output.get("reasoning_token_num", 0)
output_speculate_metrics = res["metrics"].get("speculate_metrics", None)
delta_message = CompletionResponseStreamChoice(
index=idx,
text=output["text"],
@@ -524,6 +532,7 @@ class OpenAIServingCompletion:
logprobs=logprobs_res,
prompt_logprobs=clamp_prompt_logprobs(prompt_logprobs_res),
draft_logprobs=draft_logprobs_res,
speculate_metrics=output_speculate_metrics,
)
if not res["finished"] and "delta_message" in output:
delta_message_output = output["delta_message"]
@@ -686,6 +695,7 @@ class OpenAIServingCompletion:
draft_logprobs=aggregated_draft_logprobs,
prompt_logprobs=clamp_prompt_logprobs(prompt_logprobs_res),
finish_reason=finish_reason,
speculate_metrics=final_res["metrics"].get("speculate_metrics", None),
)
choices.append(choice_data)

View File

@@ -143,9 +143,9 @@ class MetricsManager:
request_success_total: "Counter"
spec_decode_draft_acceptance_rate: "Gauge"
spec_decode_efficiency: "Gauge"
spec_decode_num_accepted_tokens_total: "Counter"
spec_decode_num_accepted_tokens_total: "Gauge"
spec_decode_num_draft_tokens_total: "Counter"
spec_decode_num_emitted_tokens_total: "Counter"
spec_decode_num_emitted_tokens_total: "Gauge"
spec_decode_draft_single_head_acceptance_rate: "list[Gauge]"
# for YIYAN Adapter
@@ -598,13 +598,13 @@ class MetricsManager:
"kwargs": {},
},
"spec_decode_num_accepted_tokens_total": {
"type": Counter,
"type": Gauge,
"name": "fastdeploy:spec_decode_num_accepted_tokens_total",
"description": "Total number of tokens accepted by the scoring model and verification program",
"kwargs": {},
},
"spec_decode_num_emitted_tokens_total": {
"type": Counter,
"type": Gauge,
"name": "fastdeploy:spec_decode_num_emitted_tokens_total",
"description": "Total number of tokens output by the entire system",
"kwargs": {},

View File

@@ -35,6 +35,7 @@ from fastdeploy.engine.request import (
Request,
RequestMetrics,
RequestOutput,
SpeculateMetrics,
)
from fastdeploy.inter_communicator import ZmqIpcServer
from fastdeploy.metrics.metrics import main_process_metrics
@@ -112,16 +113,13 @@ class TokenProcessor:
self.num_accepted_tokens = 0
self.num_emitted_tokens = 0
self.max_num_emitted_tokens = 0
self.num_rest_requests_per_head = [
0,
] * MAX_DRAFT_TOKENS
self.num_accept_requests_per_head = [
0,
] * MAX_DRAFT_TOKENS
self.executor = ThreadPoolExecutor(max_workers=1)
self.prefill_result_status = dict()
self._finalizer = weakref.finalize(self, self._cleanup_resources)
self._batch_result_buffer = None
self.total_step_per_request = {}
self.accept_token_num_per_head_per_request = {}
self.accept_token_num_per_head = [0] * MAX_DRAFT_TOKENS
def _cleanup_resources(self):
"""Cleaning up shared memory resources"""
@@ -504,7 +502,7 @@ class TokenProcessor:
if task_id in self.tokens_counter:
del self.tokens_counter[task_id]
def _compute_speculative_status(self):
def _compute_speculative_status(self, result: RequestOutput):
# TODO(liuzichang): Supplement more statistics
interval = 1
if self.speculative_stats_step % interval == 0:
@@ -517,13 +515,11 @@ class TokenProcessor:
if self.cfg.speculative_config.method in ["mtp"]:
single_head_acceptance_rates = []
for head in range(self.cfg.speculative_config.num_speculative_tokens):
if self.num_rest_requests_per_head[head] != 0:
for i in range(1, self.cfg.speculative_config.num_speculative_tokens + 1):
if self.accept_token_num_per_head[i - 1] != 0:
single_head_acceptance_rates.append(
self.num_accept_requests_per_head[head] / self.num_rest_requests_per_head[head]
self.accept_token_num_per_head[i] / self.accept_token_num_per_head[i - 1]
)
else:
single_head_acceptance_rates.append(0)
spec_logger.info(f" Single head accept ratio: {single_head_acceptance_rates}")
if self.number_of_output_tokens > 1000000:
@@ -531,6 +527,43 @@ class TokenProcessor:
self.total_step = 0
self.speculative_stats_step += 1
# For result
req_id = result.request_id
accept_num_list = self.accept_token_num_per_head_per_request[req_id]
req_total_step = self.total_step_per_request[req_id]
req_total_draft_tokens = req_total_step * (self.cfg.speculative_config.num_speculative_tokens + 1)
req_accepted_tokens = sum(accept_num_list)
req_rejected_tokens = req_total_draft_tokens - req_accepted_tokens
req_accept_ratio = 1 - req_total_step / req_accepted_tokens
req_avg_accept_length = req_accepted_tokens / req_total_step
accept_ratio_per_head = []
for i in range(1, len(accept_num_list)):
if accept_num_list[i - 1] != 0:
accept_ratio_per_head.append(accept_num_list[i] / accept_num_list[i - 1])
else:
accept_ratio_per_head.append(0)
result.metrics.speculate_metrics = SpeculateMetrics(
accepted_tokens=req_accepted_tokens,
rejected_tokens=req_rejected_tokens,
accept_ratio=req_accept_ratio,
average_accept_length=req_avg_accept_length,
accept_ratio_per_head=accept_ratio_per_head[: self.cfg.speculative_config.num_speculative_tokens],
)
# Log
spec_logger.debug(
f"req_id: {result.request_id}, total_step: {req_total_step}, "
f"accept_ratio: {accept_ratio}, average_accept_lenght: {req_avg_accept_length},"
f"accepted_tokens: {req_accepted_tokens}, rejected_tokens: {req_rejected_tokens}"
f"accept_ratio_per_head: {accept_ratio_per_head}"
)
# Clear request record
self.accept_token_num_per_head_per_request.pop(req_id)
self.total_step_per_request.pop(req_id)
def _process_batch_draft_tokens(self, mtype, batch, accept_num, tokens, scores, ranks):
"""
Process batch draft tokens and generate corresponding request outputs
@@ -618,7 +651,6 @@ class TokenProcessor:
else:
batch = self.output_tokens[1]
accept_num = tokens[2 : batch + 2]
self._record_speculative_decoding_metrics(accept_num)
elif self.use_logprobs:
batch = self.output_tokens[1, 0]
tokens = tokens[2 : batch * (K + 1) + 2].reshape([batch, K + 1])[:, : (K + 1)]
@@ -642,6 +674,7 @@ class TokenProcessor:
is_decode = task.disaggregate_info is not None and self.cfg.scheduler_config.splitwise_role == "decode"
if self.cfg.speculative_config.method:
self._record_speculative_decoding_accept_num_per_request(task_id, accept_num[i])
if accept_num[i] == -3:
recovery_stop = True
if recovery_stop:
@@ -776,7 +809,7 @@ class TokenProcessor:
)
llm_logger.info(f"{self.resource_manager.info()}")
if self.cfg.speculative_config.method:
self._compute_speculative_status()
self._compute_speculative_status(result)
if not is_prefill:
self._record_completion_metrics(task, current_time)
self._recycle_resources(task_id, i, task, result, is_prefill)
@@ -785,6 +818,8 @@ class TokenProcessor:
llm_logger.debug(f"get response from infer: {result}")
batch_result.append(result)
if self.cfg.speculative_config.method:
self._record_speculative_decoding_metrics(accept_num)
self.postprocess(batch_result, mtype)
def _record_metrics(self, task, current_time, token_ids):
@@ -828,15 +863,13 @@ class TokenProcessor:
)
real_accept_num = [x for x in accept_num if x > 0]
num_accepted_tokens = sum([x - 1 for x in real_accept_num])
self.num_accepted_tokens += num_accepted_tokens
num_emitted_tokens = sum(real_accept_num)
if num_emitted_tokens == 0:
self.num_accepted_tokens = sum(self.accept_token_num_per_head[1:])
self.num_emitted_tokens = sum(self.accept_token_num_per_head)
if self.num_emitted_tokens == 0:
return
self.num_emitted_tokens += num_emitted_tokens
main_process_metrics.spec_decode_num_accepted_tokens_total.inc(num_accepted_tokens)
main_process_metrics.spec_decode_num_emitted_tokens_total.inc(num_emitted_tokens)
main_process_metrics.spec_decode_num_accepted_tokens_total.set(self.num_accepted_tokens)
main_process_metrics.spec_decode_num_emitted_tokens_total.set(self.num_emitted_tokens)
if self.cfg.speculative_config.method in ["ngram"]:
main_process_metrics.spec_decode_draft_acceptance_rate.set(
@@ -857,25 +890,26 @@ class TokenProcessor:
main_process_metrics.spec_decode_efficiency.set(self.num_emitted_tokens / self.max_num_emitted_tokens)
main_process_metrics.spec_decode_num_draft_tokens_total.inc(num_draft_tokens)
num_rest_requests = len(real_accept_num)
for head in range(self.cfg.speculative_config.num_speculative_tokens):
num_accept_requests = len([x for x in real_accept_num if x >= head + 2])
# Accumulate the number of requests for each head
self.num_accept_requests_per_head[head] += num_accept_requests
self.num_rest_requests_per_head[head] += num_rest_requests
# Update the rest requests for each head
num_rest_requests = num_accept_requests
# Calculate the acceptance rate for each head
if self.num_rest_requests_per_head[head] != 0:
for i in range(1, self.cfg.speculative_config.num_speculative_tokens + 1):
if self.accept_token_num_per_head[i - 1] != 0:
single_head_acceptance_rate = (
self.num_accept_requests_per_head[head] / self.num_rest_requests_per_head[head]
self.accept_token_num_per_head[i] / self.accept_token_num_per_head[i - 1]
)
else:
single_head_acceptance_rate = 0
main_process_metrics.spec_decode_draft_single_head_acceptance_rate[head].set(
main_process_metrics.spec_decode_draft_single_head_acceptance_rate[i - 1].set(
single_head_acceptance_rate
)
def _record_speculative_decoding_accept_num_per_request(self, req_id, accept_num):
if req_id not in self.total_step_per_request:
self.total_step_per_request[req_id] = 0
if req_id not in self.accept_token_num_per_head_per_request:
self.accept_token_num_per_head_per_request[req_id] = [0] * MAX_DRAFT_TOKENS
self.total_step_per_request[req_id] += 1
for i in range(accept_num):
self.accept_token_num_per_head_per_request[req_id][i] += 1
self.accept_token_num_per_head[i] += 1
def clear_data(self):
if envs.ENABLE_V1_KVCACHE_SCHEDULER:
self.resource_manager.clear_data()

View File

@@ -128,6 +128,38 @@ class LogprobsTensors(NamedTuple):
PromptLogprobs = LogprobsTensors | list[dict[int, Logprob] | None]
@dataclass
class SpeculateMetrics:
"""
Speculative decoding metrics
"""
"""
The number of accepted tokens in the current request
"""
accepted_tokens: int
"""
The number of rejected tokens in the current request
"""
rejected_tokens: int
"""
The acceptance rate of the current request
"""
accept_ratio: float
"""
Average number of accepted tokens per step for the current request
"""
average_accept_length: float
"""
Average acceptance rate of each head in the current request
"""
accept_ratio_per_head: list[float]
@dataclass
class SamplerOutput:
""" """

View File

@@ -46,6 +46,7 @@ class TestCompletionEcho(unittest.IsolatedAsyncioTestCase):
"finished": True,
},
"output_token_ids": 3,
"metrics": {},
}
self.mock_engine.generate.return_value = [mock_output]
@@ -80,6 +81,7 @@ class TestCompletionEcho(unittest.IsolatedAsyncioTestCase):
"finished": True,
},
"output_token_ids": 3,
"metrics": {},
}
self.mock_engine.generate.return_value = [mock_output]
@@ -109,10 +111,12 @@ class TestCompletionEcho(unittest.IsolatedAsyncioTestCase):
{
"outputs": {"text": " response1", "token_ids": [1, 2], "top_logprobs": None, "finished": True},
"output_token_ids": 2,
"metrics": {},
},
{
"outputs": {"text": " response2", "token_ids": [3, 4], "top_logprobs": None, "finished": True},
"output_token_ids": 2,
"metrics": {},
},
]
self.mock_engine.generate.return_value = mock_outputs
@@ -146,10 +150,12 @@ class TestCompletionEcho(unittest.IsolatedAsyncioTestCase):
{
"outputs": {"text": " response1", "token_ids": [1, 2], "top_logprobs": None, "finished": True},
"output_token_ids": 2,
"metrics": {},
},
{
"outputs": {"text": " response2", "token_ids": [3, 4], "top_logprobs": None, "finished": True},
"output_token_ids": 2,
"metrics": {},
},
]
self.mock_engine.generate.return_value = mock_outputs

View File

@@ -321,6 +321,7 @@ class TestMaxStreamingResponseTokens(IsolatedAsyncioTestCase):
],
},
"finished": True,
"metrics": {},
},
{
"request_id": "test_request_id_1",
@@ -334,6 +335,7 @@ class TestMaxStreamingResponseTokens(IsolatedAsyncioTestCase):
],
},
"finished": True,
"metrics": {},
},
]
@@ -493,6 +495,7 @@ class TestMaxStreamingResponseTokens(IsolatedAsyncioTestCase):
prompt_logprobs_res_list=prompt_logprobs_res_list,
response_processor=mock_response_processor,
max_tokens=max_tokens_list[idx],
speculate_metrics=None,
)
expected = case["expected"]

View File

@@ -129,6 +129,7 @@ class TestOpenAIServingCompletion(unittest.IsolatedAsyncioTestCase):
"reasoning_token_num": 10,
},
"output_token_ids": 3,
"metrics": {},
},
{
"outputs": {
@@ -141,6 +142,7 @@ class TestOpenAIServingCompletion(unittest.IsolatedAsyncioTestCase):
"reasoning_token_num": 20,
},
"output_token_ids": 3,
"metrics": {},
},
]

View File

@@ -142,13 +142,10 @@ class TestTokenProcessorProcessBatchOutput(unittest.TestCase):
processor.num_accepted_tokens = 0
processor.num_emitted_tokens = 0
processor.max_num_emitted_tokens = 0
processor.num_rest_requests_per_head = [
0,
] * MAX_DRAFT_TOKENS
processor.num_accept_requests_per_head = [
0,
] * MAX_DRAFT_TOKENS
processor.speculative_stats_step = 0
processor.total_step_per_request = {}
processor.accept_token_num_per_head_per_request = {}
processor.accept_token_num_per_head = [0] * MAX_DRAFT_TOKENS
# processor._recycle_resources = Mock()