[Cherry-Pick] support model_id as metric labels by redefining metric update interface (#4480)

* [metric] support model_id as metric labels by redefining metric update interface

* [feat] support FD_DEFAULT_METRIC_LABEL_VALUES
This commit is contained in:
李泳桦
2025-10-20 16:11:19 +08:00
committed by GitHub
parent b3225f9a87
commit 561a7ebc0b
14 changed files with 184 additions and 75 deletions

View File

@@ -55,10 +55,10 @@ class CacheMetrics:
self.cpu_hit_token_ratio = self.total_cpu_matched_token_num / self.total_token_num
self.gpu_hit_token_ratio = self.total_gpu_matched_token_num / self.total_token_num
main_process_metrics.hit_req_rate.set(self.hit_req_ratio)
main_process_metrics.hit_token_rate.set(self.hit_token_ratio)
main_process_metrics.cpu_hit_token_rate.set(self.cpu_hit_token_ratio)
main_process_metrics.gpu_hit_token_rate.set(self.gpu_hit_token_ratio)
main_process_metrics.set_value("hit_req_rate", self.hit_req_ratio)
main_process_metrics.set_value("hit_token_rate", self.hit_token_ratio)
main_process_metrics.set_value("cpu_hit_token_rate", self.cpu_hit_token_ratio)
main_process_metrics.set_value("gpu_hit_token_rate", self.gpu_hit_token_ratio)
logger.info(
f"Metrics for all requests: req_count {self.req_count} hit_req_count {self.hit_req_count}"

View File

@@ -111,6 +111,11 @@ class PrefixCacheManager:
+ f"{self.num_cpu_blocks}, bytes_per_layer_per_block {self.cache_config.bytes_per_layer_per_block}"
)
main_process_metrics.set_value("max_gpu_block_num", self.num_gpu_blocks)
main_process_metrics.set_value("available_gpu_block_num", self.num_gpu_blocks)
main_process_metrics.set_value("free_gpu_block_num", self.num_gpu_blocks)
main_process_metrics.set_value("available_gpu_resource", 1.0)
@property
def available_gpu_resource(self):
return len(self.gpu_free_block_list) / self.num_gpu_blocks if self.num_gpu_blocks > 0 else 0.0
@@ -316,8 +321,10 @@ class PrefixCacheManager:
heapq.heapify(self.gpu_free_block_list)
self.node_id_pool = list(range(self.num_gpu_blocks + self.num_cpu_blocks))
main_process_metrics.max_gpu_block_num.set(self.num_gpu_blocks)
main_process_metrics.available_gpu_resource.set(1.0)
main_process_metrics.set_value("max_gpu_block_num", self.num_gpu_blocks)
main_process_metrics.set_value("free_gpu_block_num", self.num_gpu_blocks)
main_process_metrics.set_value("available_gpu_block_num", self.num_gpu_blocks)
main_process_metrics.set_value("available_gpu_resource", 1.0)
def _enable_cpu_cache(self):
"""
@@ -359,8 +366,8 @@ class PrefixCacheManager:
logger.info(
f"allocate_gpu_blocks: {allocated_block_ids}, len(self.gpu_free_block_list) {len(self.gpu_free_block_list)}"
)
main_process_metrics.free_gpu_block_num.set(len(self.gpu_free_block_list))
main_process_metrics.available_gpu_resource.set(self.available_gpu_resource)
main_process_metrics.set_value("free_gpu_block_num", len(self.gpu_free_block_list))
main_process_metrics.set_value("available_gpu_resource", self.available_gpu_resource)
return allocated_block_ids
def recycle_gpu_blocks(self, gpu_block_ids):
@@ -375,8 +382,8 @@ class PrefixCacheManager:
heapq.heappush(self.gpu_free_block_list, gpu_block_id)
else:
heapq.heappush(self.gpu_free_block_list, gpu_block_ids)
main_process_metrics.free_gpu_block_num.set(len(self.gpu_free_block_list))
main_process_metrics.available_gpu_resource.set(self.available_gpu_resource)
main_process_metrics.set_value("free_gpu_block_num", len(self.gpu_free_block_list))
main_process_metrics.set_value("available_gpu_resource", self.available_gpu_resource)
def allocate_cpu_blocks(self, num_blocks):
"""

View File

@@ -562,8 +562,8 @@ class EngineService:
else:
continue
main_process_metrics.num_requests_waiting.dec(len(tasks))
main_process_metrics.num_requests_running.inc(len(tasks))
main_process_metrics.dec_value("num_requests_waiting", len(tasks))
main_process_metrics.inc_value("num_requests_running", len(tasks))
except Exception as e:
err_msg = f"Error happend while insert task to engine: {e}, {traceback.format_exc()!s}."
self.llm_logger.error(err_msg)
@@ -744,7 +744,7 @@ class EngineService:
try:
request = Request.from_dict(data)
start_span("ENQUEUE_ZMQ", data, trace.SpanKind.PRODUCER)
main_process_metrics.requests_number.inc()
main_process_metrics.inc_value("requests_number", 1)
self.llm_logger.debug(f"Receive request: {request}")
except Exception as e:
self.llm_logger.error(f"Receive request error: {e}, {traceback.format_exc()!s}")
@@ -775,7 +775,7 @@ class EngineService:
added_requests.pop(request_id)
if failed is None:
main_process_metrics.num_requests_waiting.inc(1)
main_process_metrics.inc_value("num_requests_waiting", 1)
continue
error_result = RequestOutput(

View File

@@ -59,7 +59,8 @@ class ResourceManager:
# current batch status of the engine
self.real_bsz = 0
llm_logger.info(f"{self.info()}")
main_process_metrics.max_batch_size.set(max_num_seqs)
main_process_metrics.set_value("max_batch_size", max_num_seqs)
main_process_metrics.set_value("available_batch_size", self.available_batch())
def reset_cache_config(self, cfg):
"""
@@ -169,7 +170,7 @@ class ResourceManager:
ori_number = self.available_block_num()
self.cache_manager.recycle_gpu_blocks(block_tables)
cur_number = self.available_block_num()
main_process_metrics.gpu_cache_usage_perc.set(self.get_gpu_cache_usage_perc())
main_process_metrics.set_value("gpu_cache_usage_perc", self.get_gpu_cache_usage_perc())
llm_logger.info(f"recycle {req_id} {cur_number - ori_number} blocks.")
def available_batch(self):
@@ -311,15 +312,15 @@ class ResourceManager:
break
# record batch size here
task_used_block_num = sum([len(task.block_tables) if task else 0 for task in self.tasks_list])
main_process_metrics.available_gpu_block_num.set(self.total_block_number() - task_used_block_num)
main_process_metrics.batch_size.set(self.max_num_seqs - self.available_batch())
main_process_metrics.gpu_cache_usage_perc.set(self.get_gpu_cache_usage_perc())
num_blocks_used_by_tasks = sum([len(task.block_tables) if task else 0 for task in self.tasks_list])
main_process_metrics.set_value("available_gpu_block_num", self.total_block_number() - num_blocks_used_by_tasks)
main_process_metrics.set_value("batch_size", self.max_num_seqs - self.available_batch())
main_process_metrics.set_value("gpu_cache_usage_perc", self.get_gpu_cache_usage_perc())
llm_logger.info(
f"Number of allocated requests: {len(tasks)}, number of " f"running requests in worker: {self.real_bsz}"
)
llm_logger.info(f"{self.info()}")
main_process_metrics.gpu_cache_usage_perc.set(self.get_gpu_cache_usage_perc())
main_process_metrics.set_value("gpu_cache_usage_perc", self.get_gpu_cache_usage_perc())
return processed_tasks
@@ -347,9 +348,9 @@ class ResourceManager:
task.cache_info = (cache_block_num, no_cache_block_num)
# Report the number of cached tokens to Prometheus metrics
main_process_metrics.prefix_cache_token_num.inc(task.num_cached_tokens)
main_process_metrics.prefix_gpu_cache_token_num.inc(task.gpu_cache_token_num)
main_process_metrics.prefix_cpu_cache_token_num.inc(task.cpu_cache_token_num)
main_process_metrics.inc_value("prefix_cache_token_num", task.num_cached_tokens)
main_process_metrics.inc_value("prefix_gpu_cache_token_num", task.gpu_cache_token_num)
main_process_metrics.inc_value("prefix_cpu_cache_token_num", task.cpu_cache_token_num)
cached_len = len(common_block_ids) * self.cfg.block_size
task.block_tables = common_block_ids + unique_block_ids

View File

@@ -91,7 +91,7 @@ class ResourceManagerV1(ResourceManager):
self.finish_execution_pool = ThreadPoolExecutor(max_workers=1)
self.lock = threading.Lock()
self.to_be_rescheduled_request_id_set = set()
main_process_metrics.max_batch_size.set(max_num_seqs)
main_process_metrics.set_value("max_batch_size", max_num_seqs)
self.using_extend_tables_req_id = set()
@@ -144,13 +144,12 @@ class ResourceManagerV1(ResourceManager):
if preempted_req.request_id in self.req_dict:
del self.req_dict[preempted_req.request_id]
self._free_blocks(preempted_req)
main_process_metrics.num_requests_running.dec(1)
llm_logger.info(f"Preemption is triggered! Preempted request id: {preempted_req.request_id}")
else:
self._free_blocks(preempted_req)
preempted_req.cached_block_num = 0
self.to_be_rescheduled_request_id_set.add(preempted_req.request_id)
main_process_metrics.num_requests_waiting.inc(1)
main_process_metrics.num_requests_running.dec(1)
llm_logger.info(f"Preemption is triggered! Preempted request id: {preempted_req.request_id}")
preempted_reqs.append(preempted_req)
scheduled_reqs.append(self._prepare_preempt_task(preempted_req))
if preempted_req == request:
@@ -414,8 +413,6 @@ class ResourceManagerV1(ResourceManager):
request, self.config.cache_config.block_size, request.num_computed_tokens
)
request.status = RequestStatus.RUNNING
main_process_metrics.num_requests_waiting.dec(1)
main_process_metrics.num_requests_running.inc(1)
if self.config.scheduler_config.splitwise_role == "mixed":
allocated_position = self.get_available_position()
request.idx = allocated_position
@@ -460,8 +457,6 @@ class ResourceManagerV1(ResourceManager):
request, self.config.cache_config.block_size, request.num_computed_tokens
)
request.status = RequestStatus.RUNNING
main_process_metrics.num_requests_waiting.dec(1)
main_process_metrics.num_requests_running.inc(1)
else:
if self.config.cache_config.enable_prefix_caching:
self._free_blocks(request)
@@ -520,11 +515,17 @@ class ResourceManagerV1(ResourceManager):
continue
if scheduled_reqs:
task_used_block_num = sum([len(task.block_tables) if task else 0 for task in self.tasks_list])
main_process_metrics.available_gpu_block_num.set(self.total_block_number() - task_used_block_num)
main_process_metrics.batch_size.set(self.max_num_seqs - self.available_batch())
main_process_metrics.gpu_cache_usage_perc.set(self.get_gpu_cache_usage_perc())
llm_logger.debug(f"schedued_reqs: {scheduled_reqs}")
# Update metrics
num_tasks = sum([1 if task else 0 for task in self.tasks_list])
num_blocks_used_by_tasks = sum([len(task.block_tables) if task else 0 for task in self.tasks_list])
main_process_metrics.set_value("available_gpu_block_num", self.total_block_number() - num_blocks_used_by_tasks)
main_process_metrics.set_value("batch_size", self.max_num_seqs - self.available_batch())
main_process_metrics.set_value("gpu_cache_usage_perc", self.get_gpu_cache_usage_perc())
main_process_metrics.set_value("num_requests_running", len(self.running))
main_process_metrics.set_value("num_requests_waiting", num_tasks - len(self.running))
return scheduled_reqs
def get_available_position(self) -> int:
@@ -566,9 +567,9 @@ class ResourceManagerV1(ResourceManager):
request.skip_allocate = False
# Report the number of cached tokens to Prometheus metrics
main_process_metrics.prefix_cache_token_num.inc(matched_token_num)
main_process_metrics.prefix_gpu_cache_token_num.inc(request.gpu_cache_token_num)
main_process_metrics.prefix_cpu_cache_token_num.inc(request.cpu_cache_token_num)
main_process_metrics.inc_value("prefix_cache_token_num", matched_token_num)
main_process_metrics.inc_value("prefix_gpu_cache_token_num", request.gpu_cache_token_num)
main_process_metrics.inc_value("prefix_cpu_cache_token_num", request.cpu_cache_token_num)
if matched_token_num == request.need_prefill_tokens:
request.num_computed_tokens = matched_token_num - self.config.cache_config.block_size

View File

@@ -158,9 +158,9 @@ class EngineClient:
if "messages" in task:
del task["messages"]
api_server_logger.info(f"task['max_tokens']:{task['max_tokens']}")
work_process_metrics.request_params_max_tokens.observe(task["max_tokens"])
work_process_metrics.prompt_tokens_total.inc(input_ids_len)
work_process_metrics.request_prompt_tokens.observe(input_ids_len)
work_process_metrics.obs_value("request_params_max_tokens", task["max_tokens"])
work_process_metrics.inc_value("prompt_tokens_total", input_ids_len)
work_process_metrics.obs_value("request_prompt_tokens", input_ids_len)
except Exception as e:
api_server_logger.error(f"add_requests error: {e}, {str(traceback.format_exc())}")
raise EngineError(str(e), error_code=400)

View File

@@ -338,9 +338,7 @@ class OpenAIServingChat:
)
if res["finished"]:
num_choices -= 1
work_process_metrics.e2e_request_latency.observe(
time.time() - res["metrics"]["request_start_time"]
)
work_process_metrics.obs_value("e2e_request_latency", time.time() - res["metrics"]["request_start_time"])
has_no_token_limit = request.max_tokens is None and request.max_completion_tokens is None
max_tokens = request.max_completion_tokens or request.max_tokens
if has_no_token_limit or previous_num_tokens != max_tokens:
@@ -533,7 +531,7 @@ class OpenAIServingChat:
total_tokens=num_prompt_tokens + num_generated_tokens,
prompt_tokens_details=PromptTokenUsageInfo(cached_tokens=final_res.get("num_cached_tokens", 0)),
)
work_process_metrics.e2e_request_latency.observe(time.time() - final_res["metrics"]["request_start_time"])
work_process_metrics.obs_value("e2e_request_latency", time.time() - final_res["metrics"]["request_start_time"])
res = ChatCompletionResponse(
id=request_id,
created=created_time,

View File

@@ -116,6 +116,12 @@ environment_variables: dict[str, Callable[[], Any]] = {
# Max pre-fetch requests number in PD
"FD_EP_MAX_PREFETCH_TASK_NUM": lambda: int(os.getenv("FD_EP_MAX_PREFETCH_TASK_NUM", "8")),
"FD_ENABLE_MODEL_LOAD_CACHE": lambda: bool(int(os.getenv("FD_ENABLE_MODEL_LOAD_CACHE", "0"))),
# Whether to clear cpu cache when clearing model weights.
"FD_ENABLE_SWAP_SPACE_CLEARING": lambda: int(os.getenv("FD_ENABLE_SWAP_SPACE_CLEARING", "0")),
# Whether to use labels in metrics.
"FD_ENABLE_METRIC_LABELS": lambda: bool(int(os.getenv("FD_ENABLE_METRIC_LABELS", "0"))),
# Default label values in metrics.
"FD_DEFAULT_METRIC_LABEL_VALUES": lambda: os.getenv("FD_DEFAULT_METRIC_LABEL_VALUES", "{}"),
}

View File

@@ -0,0 +1,79 @@
"""
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
from prometheus_client import (
Counter,
Gauge,
Histogram,
)
from fastdeploy import envs
import json
class MetricsManagerInterface:
def set_value(self, name, value, labelvalues=None):
metric = getattr(self, name, None)
if isinstance(metric, Gauge):
if envs.FD_ENABLE_METRIC_LABELS:
default_labelvalues = json.loads(envs.FD_DEFAULT_METRIC_LABEL_VALUES)
if labelvalues is None:
labelvalues = {}
labelvalues = {ln: labelvalues.get(ln, default_labelvalues.get(ln, "")) for ln in metric._labelnames}
metric.labels(**labelvalues).set(value)
else:
metric.set(value)
return
def inc_value(self, name, value=1, labelvalues=None):
metric = getattr(self, name, None)
if isinstance(metric, Gauge) or isinstance(metric, Counter):
if envs.FD_ENABLE_METRIC_LABELS:
default_labelvalues = json.loads(envs.FD_DEFAULT_METRIC_LABEL_VALUES)
if labelvalues is None:
labelvalues = {}
labelvalues = {ln: labelvalues.get(ln, default_labelvalues.get(ln, "")) for ln in metric._labelnames}
metric.labels(**labelvalues).inc(value)
else:
metric.inc(value)
return
def dec_value(self, name, value=1, labelvalues=None):
metric = getattr(self, name, None)
if isinstance(metric, Gauge):
if envs.FD_ENABLE_METRIC_LABELS:
default_labelvalues = json.loads(envs.FD_DEFAULT_METRIC_LABEL_VALUES)
if labelvalues is None:
labelvalues = {}
labelvalues = {ln: labelvalues.get(ln, default_labelvalues.get(ln, "")) for ln in metric._labelnames}
metric.labels(**labelvalues).dec(value)
else:
metric.dec(value)
return
def obs_value(self, name, value, labelvalues=None):
metric = getattr(self, name, None)
if isinstance(metric, Histogram):
if envs.FD_ENABLE_METRIC_LABELS:
default_labelvalues = json.loads(envs.FD_DEFAULT_METRIC_LABEL_VALUES)
if labelvalues is None:
labelvalues = {}
labelvalues = {ln: labelvalues.get(ln, default_labelvalues.get(ln, "")) for ln in metric._labelnames}
metric.labels(**labelvalues).observe(value)
else:
metric.observe(value)
return

View File

@@ -34,6 +34,8 @@ from prometheus_client.registry import Collector
from fastdeploy.metrics import build_1_2_5_buckets
from fastdeploy.metrics.work_metrics import work_process_metrics
from fastdeploy.metrics.interface import MetricsManagerInterface
from fastdeploy import envs
def cleanup_prometheus_files(is_main: bool, instance_id: str = None):
@@ -51,7 +53,6 @@ def cleanup_prometheus_files(is_main: bool, instance_id: str = None):
return prom_dir
class SimpleCollector(Collector):
"""
A custom Prometheus collector that filters out specific metrics by name.
@@ -89,13 +90,19 @@ def get_filtered_metrics(exclude_names: Set[str], extra_register_func=None) -> s
:param exclude_names: metric.name set to be excluded
:param extra_register_func: optional, main process custom metric registration method
:return: filtered metric text (str)
generate_latest(filtered_registry) <- filtered_registry.collect <- SimpleCollector.collect
<- base_registry.collect <- MultiProcessCollector.collect
"""
# Register a MultiProcessCollector to base registry
# When a MultiProcessCollector collects, it reads metrics from *.db files in PROMETHEUS_MULTIPROC_DIR
base_registry = CollectorRegistry()
multiprocess.MultiProcessCollector(base_registry)
filtered_registry = CollectorRegistry()
filtered_registry.register(SimpleCollector(base_registry, exclude_names))
# extra_register_func is used to register custom metrics to filtered_registry
if extra_register_func:
extra_register_func(filtered_registry)
@@ -127,7 +134,7 @@ REQUEST_LATENCY_BUCKETS = [
]
class MetricsManager:
class MetricsManager(MetricsManagerInterface):
"""Prometheus Metrics Manager handles all metric updates"""
_instance = None
@@ -392,11 +399,17 @@ class MetricsManager:
"kwargs": {},
},
}
SPECULATIVE_METRICS = {}
def __init__(self):
"""Initializes the Prometheus metrics and starts the HTTP server if not already initialized."""
# 动态创建所有指标
# Add labels to existing metrics: model_id
if envs.FD_ENABLE_METRIC_LABELS:
for metric in self.METRICS:
self.METRICS[metric]["kwargs"]["labelnames"] = ["model_id"]
# Create metrics dynamically
for metric_name, config in self.METRICS.items():
setattr(
self,

View File

@@ -21,9 +21,10 @@ metrics
from prometheus_client import Counter, Histogram
from fastdeploy.metrics.metrics import build_1_2_5_buckets
from fastdeploy.metrics.interface import MetricsManagerInterface
from fastdeploy import envs
class WorkMetricsManager:
class WorkMetricsManager(MetricsManagerInterface):
"""Prometheus Metrics Manager handles all metric updates"""
_initialized = False
@@ -34,6 +35,9 @@ class WorkMetricsManager:
if self._initialized:
return
# Add labels to existing metrics: model_id
LABEL_NAMES = ["model_id"] if envs.FD_ENABLE_METRIC_LABELS else []
self.e2e_request_latency = Histogram(
"fastdeploy:e2e_request_latency_seconds",
"End-to-end request latency (from request arrival to final response)",
@@ -60,22 +64,26 @@ class WorkMetricsManager:
1920.0,
7680.0,
],
labelnames=LABEL_NAMES,
)
self.request_params_max_tokens = Histogram(
name="fastdeploy:request_params_max_tokens",
documentation="Histogram of max_tokens parameter in request parameters",
buckets=build_1_2_5_buckets(33792),
labelnames=LABEL_NAMES,
)
self.prompt_tokens_total = Counter(
name="fastdeploy:prompt_tokens_total",
documentation="Total number of prompt tokens processed",
labelnames=LABEL_NAMES,
)
self.request_prompt_tokens = Histogram(
name="fastdeploy:request_prompt_tokens",
documentation="Number of prefill tokens processed.",
buckets=build_1_2_5_buckets(33792),
labelnames=LABEL_NAMES,
)
self._initialized = True

View File

@@ -342,7 +342,7 @@ class TokenProcessor:
while current_index < len(self.prefill_time_signal.value):
prefill_time = self.prefill_time_signal.value[current_index]
if prefill_time > 0:
main_process_metrics.request_prefill_time.observe(prefill_time)
main_process_metrics.obs_value("request_prefill_time", prefill_time)
self.prefill_time_signal.value[current_index] = 0
current_index += 1
except Exception as e:
@@ -399,14 +399,10 @@ class TokenProcessor:
if task_id in self.resource_manager.req_dict:
del self.resource_manager.req_dict[task_id]
task_used_block_num = sum([len(task.block_tables) if task else 0 for task in self.resource_manager.tasks_list])
main_process_metrics.available_gpu_block_num.set(
self.resource_manager.total_block_number() - task_used_block_num
)
main_process_metrics.batch_size.set(
self.resource_manager.max_num_seqs - self.resource_manager.available_batch()
)
main_process_metrics.available_batch_size.set(self.resource_manager.available_batch())
num_blocks_used_by_tasks = sum([len(task.block_tables) if task else 0 for task in self.resource_manager.tasks_list])
main_process_metrics.set_value("available_gpu_block_num", self.resource_manager.total_block_number() - num_blocks_used_by_tasks)
main_process_metrics.set_value("batch_size", self.resource_manager.max_num_seqs - self.resource_manager.available_batch())
main_process_metrics.set_value("available_batch_size", self.resource_manager.available_batch())
if task_id in self.tokens_counter:
del self.tokens_counter[task_id]
@@ -597,30 +593,30 @@ class TokenProcessor:
"""Record all metrics for a task"""
if hasattr(task, "last_token_time") and task.last_token_time is not None:
token_gen_time = current_time - task.last_token_time
main_process_metrics.time_per_output_token.observe(token_gen_time)
main_process_metrics.obs_value("time_per_output_token", token_gen_time)
task.last_token_time = current_time
# Record generation metrics
main_process_metrics.generation_tokens_total.inc(len(token_ids))
main_process_metrics.inc_value("generation_tokens_total", len(token_ids))
def _record_first_token_metrics(self, task, current_time):
"""Record metrics for first token"""
task.first_token_time = current_time
main_process_metrics.first_token_latency.set(current_time - task.inference_start_time)
main_process_metrics.time_to_first_token.observe(current_time - task.inference_start_time)
main_process_metrics.request_queue_time.observe(task.schedule_start_time - task.preprocess_end_time)
main_process_metrics.set_value("first_token_latency", current_time - task.inference_start_time)
main_process_metrics.obs_value("time_to_first_token", current_time - task.inference_start_time)
main_process_metrics.obs_value("request_queue_time", task.schedule_start_time - task.preprocess_end_time)
def _record_completion_metrics(self, task, current_time):
"""Record metrics when request completes"""
if hasattr(task, "first_token_time"):
decode_time = current_time - task.first_token_time
main_process_metrics.request_decode_time.observe(decode_time)
main_process_metrics.obs_value("request_decode_time", decode_time)
main_process_metrics.num_requests_running.dec(1)
main_process_metrics.request_success_total.inc()
main_process_metrics.infer_latency.set(current_time - task.inference_start_time)
main_process_metrics.request_inference_time.observe(current_time - task.inference_start_time)
main_process_metrics.request_generation_tokens.observe(self.tokens_counter[task.request_id])
main_process_metrics.dec_value("num_requests_running", 1)
main_process_metrics.inc_value("request_success_total", 1)
main_process_metrics.set_value("infer_latency", current_time - task.inference_start_time)
main_process_metrics.obs_value("request_inference_time", current_time - task.inference_start_time)
main_process_metrics.obs_value("request_generation_tokens", self.tokens_counter[task.request_id])
def _record_speculative_decoding_mertics(self, accept_num):
"""Record metrics of speculative decoding"""

View File

@@ -22,7 +22,7 @@ import traceback
import zmq
from fastdeploy.inter_communicator import ZmqTcpServer
from fastdeploy.metrics.metrics import get_filtered_metrics, main_process_metrics
from fastdeploy.metrics.metrics import EXCLUDE_LABELS, get_filtered_metrics, main_process_metrics
from fastdeploy.utils import envs, get_logger
logger = get_logger("internal_adapter_utils", "internal_adapter_utils.log")
@@ -89,7 +89,7 @@ class InternalAdapter:
elif task["cmd"] == "get_metrics":
metrics_text = get_filtered_metrics(
[],
EXCLUDE_LABELS,
extra_register_func=lambda reg: main_process_metrics.register_all(reg, workers=1),
)
result = {"task_id": task_id_str, "result": metrics_text}

View File

@@ -161,7 +161,7 @@ class SplitwiseConnector:
self.logger.warning(f"Send queue full for {addr}")
except Exception as e:
self.logger.error(f"Send to {addr} failed: {e}, {str(traceback.format_exc())}")
main_process_metrics.send_cache_failed_num.inc()
main_process_metrics.inc_value("send_cache_failed_num")
self._close_connection(addr)
except Exception as e: