diff --git a/docs/online_serving/metrics.md b/docs/online_serving/metrics.md index c5c16ee81..f71fa6390 100644 --- a/docs/online_serving/metrics.md +++ b/docs/online_serving/metrics.md @@ -20,7 +20,12 @@ After FastDeploy is launched, it supports continuous monitoring of the FastDeplo | `fastdeploy:gpu_cache_usage_perc` | Gauge | GPU KV-cache usage rate | Percentage | | `fastdeploy:request_params_max_tokens` | Histogram | Distribution of max_tokens for requests | Count | | `fastdeploy:request_success_total` | Counter | Number of successfully processed requests | Count | - +| `fastdeploy:cache_config_info` | Gauge | Information of the engine's CacheConfig | Count | +| `fastdeploy:available_batch_size` | Gauge | Number of requests that can still be inserted during the Decode phase| Count | +| `fastdeploy:hit_req_rate` | Gauge | Request-level prefix cache hit rate | Percentage | +| `fastdeploy:hit_token_rate` | Gauge | Token-level prefix cache hit rate | Percentage | +| `fastdeploy:cpu_hit_token_rate` | Gauge | Token-level CPU prefix cache hit rate | Percentage | +| `fastdeploy:gpu_hit_token_rate` | Gauge | Token-level GPU prefix cache hit rate | Percentage | ## Accessing Metrics - Access URL: `http://localhost:8000/metrics` diff --git a/docs/zh/online_serving/metrics.md b/docs/zh/online_serving/metrics.md index 578124acd..cb9448257 100644 --- a/docs/zh/online_serving/metrics.md +++ b/docs/zh/online_serving/metrics.md @@ -20,7 +20,12 @@ | `fastdeploy:gpu_cache_usage_perc` | Gauge | GPU KV-cache 使用率 | 百分比 | | `fastdeploy:request_params_max_tokens` | Histogram | 请求的 max_tokens 分布 | 个 | | `fastdeploy:request_success_total` | Counter | 成功处理的请求个数 | 个 | - +| `fastdeploy:cache_config_info` | Gauge | 推理引擎的缓存配置信息 | 个 | +| `fastdeploy:available_batch_size` | Gauge | Decode阶段还可以插入的请求数量 | 个 | +| `fastdeploy:hit_req_rate` | Gauge | 请求级别前缀缓存命中率 | 百分比 | +| `fastdeploy:hit_token_rate` | Gauge | token级别前缀缓存命中率 | 百分比 | +| `fastdeploy:cpu_hit_token_rate` | Gauge | token级别CPU前缀缓存命中率 | 百分比 | +| `fastdeploy:gpu_hit_token_rate` | Gauge | token级别GPU前缀缓存命中率 | 百分比 | ## 指标访问 - 访问地址:`http://localhost:8000/metrics` diff --git a/fastdeploy/cache_manager/cache_metrics.py b/fastdeploy/cache_manager/cache_metrics.py index 2f5acf36a..3946357c8 100644 --- a/fastdeploy/cache_manager/cache_metrics.py +++ b/fastdeploy/cache_manager/cache_metrics.py @@ -14,6 +14,7 @@ # limitations under the License. """ +from fastdeploy.metrics.metrics import main_process_metrics from fastdeploy.utils import get_logger logger = get_logger("prefix_cache_manager", "prefix_cache_manager.log") @@ -54,6 +55,11 @@ class CacheMetrics: self.cpu_hit_token_ratio = self.total_cpu_matched_token_num / self.total_token_num self.gpu_hit_token_ratio = self.total_gpu_matched_token_num / self.total_token_num + main_process_metrics.hit_req_rate.set(self.hit_req_ratio) + main_process_metrics.hit_token_rate.set(self.hit_token_ratio) + main_process_metrics.cpu_hit_token_rate.set(self.cpu_hit_token_ratio) + main_process_metrics.gpu_hit_token_rate.set(self.gpu_hit_token_ratio) + logger.info( f"Metrics for all requests: req_count {self.req_count} hit_req_count {self.hit_req_count}" + f" hit_req_ratio {self.hit_req_ratio:.2f} hit_token_ratio {self.hit_token_ratio:.2f}" diff --git a/fastdeploy/engine/engine.py b/fastdeploy/engine/engine.py index e24db25b4..c3978443d 100644 --- a/fastdeploy/engine/engine.py +++ b/fastdeploy/engine/engine.py @@ -39,6 +39,7 @@ from fastdeploy.engine.expert_service import start_data_parallel_service from fastdeploy.engine.request import Request from fastdeploy.input.preprocess import InputPreprocessor from fastdeploy.inter_communicator import EngineWorkerQueue, IPCSignal +from fastdeploy.metrics.metrics import main_process_metrics from fastdeploy.utils import EngineError, console_logger, envs, llm_logger @@ -101,6 +102,8 @@ class LLMEngine: self.do_profile = 0 self._finalizer = weakref.finalize(self, self._exit_sub_services) + main_process_metrics.set_cache_config_info(obj=self.cfg.cache_config) + def start(self, api_server_pid=None): """ Initializes the engine and starts its sub-services. diff --git a/fastdeploy/engine/resource_manager.py b/fastdeploy/engine/resource_manager.py index a1ec5efc3..556547073 100644 --- a/fastdeploy/engine/resource_manager.py +++ b/fastdeploy/engine/resource_manager.py @@ -315,7 +315,6 @@ class ResourceManager: main_process_metrics.available_gpu_block_num.set(self.total_block_number() - task_used_block_num) main_process_metrics.batch_size.set(self.max_num_seqs - self.available_batch()) main_process_metrics.gpu_cache_usage_perc.set(self.get_gpu_cache_usage_perc()) - llm_logger.info( f"Number of allocated requests: {len(tasks)}, number of " f"running requests in worker: {self.real_bsz}" ) diff --git a/fastdeploy/metrics/metrics.py b/fastdeploy/metrics/metrics.py index 0798d89af..9b798b873 100644 --- a/fastdeploy/metrics/metrics.py +++ b/fastdeploy/metrics/metrics.py @@ -169,7 +169,12 @@ class MetricsManager: send_cache_failed_num: "Counter" first_token_latency: "Gauge" infer_latency: "Gauge" - + cache_config_info: "Gauge" + available_batch_size: "Gauge" + hit_req_rate: "Gauge" + hit_token_rate: "Gauge" + cpu_hit_token_rate: "Gauge" + gpu_hit_token_rate: "Gauge" # 定义所有指标配置 METRICS = { "num_requests_running": { @@ -359,6 +364,36 @@ class MetricsManager: "description": "Latest time to generate one token in seconds", "kwargs": {}, }, + "available_batch_size": { + "type": Gauge, + "name": "fastdeploy:available_batch_size", + "description": "Number of requests that can still be inserted during the Decode phase", + "kwargs": {}, + }, + "hit_req_rate": { + "type": Gauge, + "name": "fastdeploy:hit_req_rate", + "description": "Request-level prefix cache hit rate", + "kwargs": {}, + }, + "hit_token_rate": { + "type": Gauge, + "name": "fastdeploy:hit_token_rate", + "description": "Token-level prefix cache hit rate", + "kwargs": {}, + }, + "cpu_hit_token_rate": { + "type": Gauge, + "name": "fastdeploy:cpu_hit_token_rate", + "description": "Token-level CPU prefix cache hit rate", + "kwargs": {}, + }, + "gpu_hit_token_rate": { + "type": Gauge, + "name": "fastdeploy:gpu_hit_token_rate", + "description": "Token-level GPU prefix cache hit rate", + "kwargs": {}, + }, } SPECULATIVE_METRICS = {} @@ -434,6 +469,26 @@ class MetricsManager: ), ) + def set_cache_config_info(self, obj) -> None: + if hasattr(self, "cache_config_info") and isinstance(self.cache_config_info, Gauge): + metrics_info = obj.metrics_info() + if metrics_info: + self.cache_config_info.labels(**metrics_info).set(1) + return + + metrics_info = obj.metrics_info() + if not metrics_info: + return + + self.cache_config_info = Gauge( + name="fastdeploy:cache_config_info", + documentation="Information of the engine's CacheConfig", + labelnames=list(metrics_info.keys()), + multiprocess_mode="mostrecent", + ) + + self.cache_config_info.labels(**metrics_info).set(1) + def register_speculative_metrics(self, registry: CollectorRegistry): """Register all speculative metrics to the specified registry""" for metric_name in self.SPECULATIVE_METRICS: @@ -447,6 +502,8 @@ class MetricsManager: """Register all metrics to the specified registry""" for metric_name in self.METRICS: registry.register(getattr(self, metric_name)) + if self.cache_config_info is not None: + registry.register(self.cache_config_info) if workers == 1: registry.register(work_process_metrics.e2e_request_latency) registry.register(work_process_metrics.request_params_max_tokens) diff --git a/fastdeploy/output/token_processor.py b/fastdeploy/output/token_processor.py index 48d5914a8..8245e5657 100644 --- a/fastdeploy/output/token_processor.py +++ b/fastdeploy/output/token_processor.py @@ -282,6 +282,7 @@ class TokenProcessor: main_process_metrics.batch_size.set( self.resource_manager.max_num_seqs - self.resource_manager.available_batch() ) + main_process_metrics.available_batch_size.set(self.resource_manager.available_batch()) if task_id in self.tokens_counter: del self.tokens_counter[task_id] diff --git a/tests/ci_use/Qwen2-7B-Instruct_serving/test_Qwen2-7B-Instruct_serving.py b/tests/ci_use/Qwen2-7B-Instruct_serving/test_Qwen2-7B-Instruct_serving.py index f29f4e110..13f854f6e 100644 --- a/tests/ci_use/Qwen2-7B-Instruct_serving/test_Qwen2-7B-Instruct_serving.py +++ b/tests/ci_use/Qwen2-7B-Instruct_serving/test_Qwen2-7B-Instruct_serving.py @@ -421,6 +421,12 @@ def test_metrics_endpoint(metrics_url): gpu_cache_usage_perc_found = False request_params_max_tokens_sum_found = False request_success_total_found = False + cache_config_info_found = False + available_batch_size_found = False + hit_req_rate_found = False + hit_token_rate_found = False + cpu_hit_token_rate_found = False + gpu_hit_token_rate_found = False for line in metric_lines: if line.startswith("fastdeploy:num_requests_running"): @@ -487,7 +493,30 @@ def test_metrics_endpoint(metrics_url): _, value = line.rsplit(" ", 1) assert float(value) >= 0, "request_success_total 值错误" request_success_total_found = True - + elif line.startswith("fastdeploy:cache_config_info"): + _, value = line.rsplit(" ", 1) + assert float(value) >= 0, "cache_config_info 值错误" + cache_config_info_found = True + elif line.startswith("fastdeploy:available_batch_size"): + _, value = line.rsplit(" ", 1) + assert float(value) >= 0, "available_batch_size 值错误" + available_batch_size_found = True + elif line.startswith("fastdeploy:hit_req_rate"): + _, value = line.rsplit(" ", 1) + assert float(value) >= 0, "hit_req_rate 值错误" + hit_req_rate_found = True + elif line.startswith("fastdeploy:hit_token_rate"): + _, value = line.rsplit(" ", 1) + assert float(value) >= 0, "hit_token_rate 值错误" + hit_token_rate_found = True + elif line.startswith("fastdeploy:cpu_hit_token_rate"): + _, value = line.rsplit(" ", 1) + assert float(value) >= 0, "cpu_hit_token_rate 值错误" + cpu_hit_token_rate_found = True + elif line.startswith("fastdeploy:gpu_hit_token_rate"): + _, value = line.rsplit(" ", 1) + assert float(value) >= 0, "gpu_hit_token_rate 值错误" + gpu_hit_token_rate_found = True assert num_requests_running_found, "缺少 fastdeploy:num_requests_running 指标" assert num_requests_waiting_found, "缺少 fastdeploy:num_requests_waiting 指标" assert time_to_first_token_seconds_sum_found, "缺少 fastdeploy:time_to_first_token_seconds_sum 指标" @@ -504,6 +533,12 @@ def test_metrics_endpoint(metrics_url): assert gpu_cache_usage_perc_found, "缺少 fastdeploy:gpu_cache_usage_perc 指标" assert request_params_max_tokens_sum_found, "缺少 fastdeploy:request_params_max_tokens_sum 指标" assert request_success_total_found, "缺少 fastdeploy:request_success_total 指标" + assert cache_config_info_found, "缺少 fastdeploy:cache_config_info 指标" + assert available_batch_size_found, "缺少 fastdeploy:available_batch_size 指标" + assert hit_req_rate_found, "缺少 fastdeploy:hit_req_rate 指标" + assert hit_token_rate_found, "缺少 fastdeploy:hit_token_rate 指标" + assert cpu_hit_token_rate_found, "缺少 fastdeploy:hit_token_rate 指标" + assert gpu_hit_token_rate_found, "缺少 fastdeploy:gpu_hit_token_rate 指标" # ========================== diff --git a/tests/e2e/test_Qwen2-7B-Instruct_serving.py b/tests/e2e/test_Qwen2-7B-Instruct_serving.py index cf836ef94..1c8c0ca31 100644 --- a/tests/e2e/test_Qwen2-7B-Instruct_serving.py +++ b/tests/e2e/test_Qwen2-7B-Instruct_serving.py @@ -434,6 +434,12 @@ def test_metrics_endpoint(metrics_url): gpu_cache_usage_perc_found = False request_params_max_tokens_sum_found = False request_success_total_found = False + cache_config_info_found = False + available_batch_size_found = False + hit_req_rate_found = False + hit_token_rate_found = False + cpu_hit_token_rate_found = False + gpu_hit_token_rate_found = False for line in metric_lines: if line.startswith("fastdeploy:num_requests_running"): @@ -500,7 +506,30 @@ def test_metrics_endpoint(metrics_url): _, value = line.rsplit(" ", 1) assert float(value) >= 0, "request_success_total 值错误" request_success_total_found = True - + elif line.startswith("fastdeploy:cache_config_info"): + _, value = line.rsplit(" ", 1) + assert float(value) >= 0, "cache_config_info 值错误" + cache_config_info_found = True + elif line.startswith("fastdeploy:available_batch_size"): + _, value = line.rsplit(" ", 1) + assert float(value) >= 0, "available_batch_size 值错误" + available_batch_size_found = True + elif line.startswith("fastdeploy:hit_req_rate"): + _, value = line.rsplit(" ", 1) + assert float(value) >= 0, "hit_req_rate 值错误" + hit_req_rate_found = True + elif line.startswith("fastdeploy:hit_token_rate"): + _, value = line.rsplit(" ", 1) + assert float(value) >= 0, "hit_token_rate 值错误" + hit_token_rate_found = True + elif line.startswith("fastdeploy:cpu_hit_token_rate"): + _, value = line.rsplit(" ", 1) + assert float(value) >= 0, "cpu_hit_token_rate 值错误" + cpu_hit_token_rate_found = True + elif line.startswith("fastdeploy:gpu_hit_token_rate"): + _, value = line.rsplit(" ", 1) + assert float(value) >= 0, "gpu_hit_token_rate 值错误" + gpu_hit_token_rate_found = True assert num_requests_running_found, "缺少 fastdeploy:num_requests_running 指标" assert num_requests_waiting_found, "缺少 fastdeploy:num_requests_waiting 指标" assert time_to_first_token_seconds_sum_found, "缺少 fastdeploy:time_to_first_token_seconds_sum 指标" @@ -517,6 +546,12 @@ def test_metrics_endpoint(metrics_url): assert gpu_cache_usage_perc_found, "缺少 fastdeploy:gpu_cache_usage_perc 指标" assert request_params_max_tokens_sum_found, "缺少 fastdeploy:request_params_max_tokens_sum 指标" assert request_success_total_found, "缺少 fastdeploy:request_success_total 指标" + assert cache_config_info_found, "缺少 fastdeploy:cache_config_info 指标" + assert available_batch_size_found, "缺少 fastdeploy:available_batch_size 指标" + assert hit_req_rate_found, "缺少 fastdeploy:hit_req_rate 指标" + assert hit_token_rate_found, "缺少 fastdeploy:hit_token_rate 指标" + assert cpu_hit_token_rate_found, "缺少 fastdeploy:hit_token_rate 指标" + assert gpu_hit_token_rate_found, "缺少 fastdeploy:gpu_hit_token_rate 指标" # ========================== diff --git a/tests/metrics/test_new_metrics.py b/tests/metrics/test_new_metrics.py new file mode 100644 index 000000000..bbcaba3fc --- /dev/null +++ b/tests/metrics/test_new_metrics.py @@ -0,0 +1,92 @@ +import unittest +from unittest.mock import MagicMock, patch + +from fastdeploy.cache_manager.cache_metrics import CacheMetrics +from fastdeploy.output.token_processor import TokenProcessor + + +class TestCoverageFix(unittest.TestCase): + @patch("fastdeploy.cache_manager.cache_metrics.main_process_metrics") + def test_cache_metrics_update_history(self, mock_main_process_metrics): + """ + 测试 CacheMetrics._update_history_hit_metrics 方法。 + + 目标:确保 main_process_metrics 的 .set() 方法被正确调用,覆盖第 58-61 行。 + """ + print("\nRunning test for CacheMetrics._update_history_hit_metrics...") + metrics = CacheMetrics() + + # 准备数据以避免除零错误 + metrics.req_count = 20 + metrics.hit_req_count = 10 + metrics.total_token_num = 1000 + metrics.total_cpu_matched_token_num = 250 + metrics.total_gpu_matched_token_num = 350 + metrics.matched_token_num = metrics.total_cpu_matched_token_num + metrics.total_gpu_matched_token_num + + # 调用目标方法 + metrics._update_history_hit_metrics() + + # 断言 Prometheus 指标的 set 方法是否被正确的值调用 + mock_main_process_metrics.hit_req_rate.set.assert_called_once_with(0.5) # 10 / 20 + mock_main_process_metrics.hit_token_rate.set.assert_called_once_with(0.6) # 600 / 1000 + mock_main_process_metrics.cpu_hit_token_rate.set.assert_called_once_with(0.25) # 250 / 1000 + mock_main_process_metrics.gpu_hit_token_rate.set.assert_called_once_with(0.35) # 350 / 1000 + + print("Test for CacheMetrics passed.") + + def setUp(self): + """为 TokenProcessor 测试设置通用的 mock 对象。""" + self.mock_cfg = MagicMock() + self.mock_cached_generated_tokens = MagicMock() + self.mock_engine_worker_queue = MagicMock() + self.mock_split_connector = MagicMock() + self.mock_resource_manager = MagicMock() + + with patch("fastdeploy.output.token_processor.IPCSignal"): + self.processor = TokenProcessor( + cfg=self.mock_cfg, + cached_generated_tokens=self.mock_cached_generated_tokens, + engine_worker_queue=self.mock_engine_worker_queue, + split_connector=self.mock_split_connector, + ) + self.processor.resource_manager = self.mock_resource_manager + + # 使用 patch 来模拟 token_processor 模块中引用的 main_process_metrics + @patch("fastdeploy.output.token_processor.main_process_metrics") + def test_recycle_resources_updates_metrics(self, mock_main_process_metrics): + """ + 测试 TokenProcessor._recycle_resources 方法。 + + 目标:确保 available_batch_size 等指标被更新,覆盖第 285 行左右的代码。 + """ + print("\nRunning test for TokenProcessor._recycle_resources (metric update)...") + + # 1. 准备测试数据和 mock 行为 + task_id = "request-456" + index = 0 + mock_task = MagicMock() + + # 配置 resource_manager 的 mock 返回值 + self.mock_resource_manager.available_batch.return_value = 8 + self.mock_resource_manager.total_block_number.return_value = 1024 + self.mock_resource_manager.max_num_seqs = 16 + + # _recycle_resources 方法内部会操作这些列表/字典 + self.mock_resource_manager.tasks_list = [mock_task] + self.mock_resource_manager.stop_flags = [False] + + # 为了避免 del self.tokens_counter[task_id] 抛出 KeyError + self.processor.tokens_counter[task_id] = 5 + + # 调用目标方法 + self.processor._recycle_resources(task_id=task_id, index=index, task=mock_task, result=None, is_prefill=False) + + # 核心断言:验证 available_batch_size 指标是否被正确设置 + mock_main_process_metrics.available_batch_size.set.assert_called_once_with(8) + + print("Test for TokenProcessor passed.") + + +if __name__ == "__main__": + unittest.main()