diff --git a/docs/online_serving/metrics.md b/docs/online_serving/metrics.md index c5c16ee81..f71fa6390 100644 --- a/docs/online_serving/metrics.md +++ b/docs/online_serving/metrics.md @@ -20,7 +20,12 @@ After FastDeploy is launched, it supports continuous monitoring of the FastDeplo | `fastdeploy:gpu_cache_usage_perc` | Gauge | GPU KV-cache usage rate | Percentage | | `fastdeploy:request_params_max_tokens` | Histogram | Distribution of max_tokens for requests | Count | | `fastdeploy:request_success_total` | Counter | Number of successfully processed requests | Count | - +| `fastdeploy:cache_config_info` | Gauge | Information of the engine's CacheConfig | Count | +| `fastdeploy:available_batch_size` | Gauge | Number of requests that can still be inserted during the Decode phase| Count | +| `fastdeploy:hit_req_rate` | Gauge | Request-level prefix cache hit rate | Percentage | +| `fastdeploy:hit_token_rate` | Gauge | Token-level prefix cache hit rate | Percentage | +| `fastdeploy:cpu_hit_token_rate` | Gauge | Token-level CPU prefix cache hit rate | Percentage | +| `fastdeploy:gpu_hit_token_rate` | Gauge | Token-level GPU prefix cache hit rate | Percentage | ## Accessing Metrics - Access URL: `http://localhost:8000/metrics` diff --git a/docs/zh/online_serving/metrics.md b/docs/zh/online_serving/metrics.md index 578124acd..cb9448257 100644 --- a/docs/zh/online_serving/metrics.md +++ b/docs/zh/online_serving/metrics.md @@ -20,7 +20,12 @@ | `fastdeploy:gpu_cache_usage_perc` | Gauge | GPU KV-cache 使用率 | 百分比 | | `fastdeploy:request_params_max_tokens` | Histogram | 请求的 max_tokens 分布 | 个 | | `fastdeploy:request_success_total` | Counter | 成功处理的请求个数 | 个 | - +| `fastdeploy:cache_config_info` | Gauge | 推理引擎的缓存配置信息 | 个 | +| `fastdeploy:available_batch_size` | Gauge | Decode阶段还可以插入的请求数量 | 个 | +| `fastdeploy:hit_req_rate` | Gauge | 请求级别前缀缓存命中率 | 百分比 | +| `fastdeploy:hit_token_rate` | Gauge | token级别前缀缓存命中率 | 百分比 | +| `fastdeploy:cpu_hit_token_rate` | Gauge | token级别CPU前缀缓存命中率 | 百分比 | +| `fastdeploy:gpu_hit_token_rate` | Gauge | token级别GPU前缀缓存命中率 | 百分比 | ## 指标访问 - 访问地址:`http://localhost:8000/metrics` diff --git a/fastdeploy/cache_manager/cache_metrics.py b/fastdeploy/cache_manager/cache_metrics.py index 2f5acf36a..3946357c8 100644 --- a/fastdeploy/cache_manager/cache_metrics.py +++ b/fastdeploy/cache_manager/cache_metrics.py @@ -14,6 +14,7 @@ # limitations under the License. """ +from fastdeploy.metrics.metrics import main_process_metrics from fastdeploy.utils import get_logger logger = get_logger("prefix_cache_manager", "prefix_cache_manager.log") @@ -54,6 +55,11 @@ class CacheMetrics: self.cpu_hit_token_ratio = self.total_cpu_matched_token_num / self.total_token_num self.gpu_hit_token_ratio = self.total_gpu_matched_token_num / self.total_token_num + main_process_metrics.hit_req_rate.set(self.hit_req_ratio) + main_process_metrics.hit_token_rate.set(self.hit_token_ratio) + main_process_metrics.cpu_hit_token_rate.set(self.cpu_hit_token_ratio) + main_process_metrics.gpu_hit_token_rate.set(self.gpu_hit_token_ratio) + logger.info( f"Metrics for all requests: req_count {self.req_count} hit_req_count {self.hit_req_count}" + f" hit_req_ratio {self.hit_req_ratio:.2f} hit_token_ratio {self.hit_token_ratio:.2f}" diff --git a/fastdeploy/engine/engine.py b/fastdeploy/engine/engine.py index bbc8651ad..6e26262cf 100644 --- a/fastdeploy/engine/engine.py +++ b/fastdeploy/engine/engine.py @@ -165,6 +165,7 @@ class LLMEngine: self.cfg.guided_decoding_backend, disable_any_whitespace=self.cfg.disable_any_whitespace, ) + main_process_metrics.set_cache_config_info(obj=self.cfg.cache_config) def start(self, api_server_pid=None): """ diff --git a/fastdeploy/engine/resource_manager.py b/fastdeploy/engine/resource_manager.py index 15c17bdd1..d228b98a8 100644 --- a/fastdeploy/engine/resource_manager.py +++ b/fastdeploy/engine/resource_manager.py @@ -318,7 +318,6 @@ class ResourceManager: main_process_metrics.available_gpu_block_num.set(self.total_block_number() - task_used_block_num) main_process_metrics.batch_size.set(self.max_num_seqs - self.available_batch()) main_process_metrics.gpu_cache_usage_perc.set(self.get_gpu_cache_usage_perc()) - llm_logger.info( f"Number of allocated requests: {len(tasks)}, number of " f"running requests in worker: {self.real_bsz}" ) diff --git a/fastdeploy/metrics/metrics.py b/fastdeploy/metrics/metrics.py index 0798d89af..9b798b873 100644 --- a/fastdeploy/metrics/metrics.py +++ b/fastdeploy/metrics/metrics.py @@ -169,7 +169,12 @@ class MetricsManager: send_cache_failed_num: "Counter" first_token_latency: "Gauge" infer_latency: "Gauge" - + cache_config_info: "Gauge" + available_batch_size: "Gauge" + hit_req_rate: "Gauge" + hit_token_rate: "Gauge" + cpu_hit_token_rate: "Gauge" + gpu_hit_token_rate: "Gauge" # 定义所有指标配置 METRICS = { "num_requests_running": { @@ -359,6 +364,36 @@ class MetricsManager: "description": "Latest time to generate one token in seconds", "kwargs": {}, }, + "available_batch_size": { + "type": Gauge, + "name": "fastdeploy:available_batch_size", + "description": "Number of requests that can still be inserted during the Decode phase", + "kwargs": {}, + }, + "hit_req_rate": { + "type": Gauge, + "name": "fastdeploy:hit_req_rate", + "description": "Request-level prefix cache hit rate", + "kwargs": {}, + }, + "hit_token_rate": { + "type": Gauge, + "name": "fastdeploy:hit_token_rate", + "description": "Token-level prefix cache hit rate", + "kwargs": {}, + }, + "cpu_hit_token_rate": { + "type": Gauge, + "name": "fastdeploy:cpu_hit_token_rate", + "description": "Token-level CPU prefix cache hit rate", + "kwargs": {}, + }, + "gpu_hit_token_rate": { + "type": Gauge, + "name": "fastdeploy:gpu_hit_token_rate", + "description": "Token-level GPU prefix cache hit rate", + "kwargs": {}, + }, } SPECULATIVE_METRICS = {} @@ -434,6 +469,26 @@ class MetricsManager: ), ) + def set_cache_config_info(self, obj) -> None: + if hasattr(self, "cache_config_info") and isinstance(self.cache_config_info, Gauge): + metrics_info = obj.metrics_info() + if metrics_info: + self.cache_config_info.labels(**metrics_info).set(1) + return + + metrics_info = obj.metrics_info() + if not metrics_info: + return + + self.cache_config_info = Gauge( + name="fastdeploy:cache_config_info", + documentation="Information of the engine's CacheConfig", + labelnames=list(metrics_info.keys()), + multiprocess_mode="mostrecent", + ) + + self.cache_config_info.labels(**metrics_info).set(1) + def register_speculative_metrics(self, registry: CollectorRegistry): """Register all speculative metrics to the specified registry""" for metric_name in self.SPECULATIVE_METRICS: @@ -447,6 +502,8 @@ class MetricsManager: """Register all metrics to the specified registry""" for metric_name in self.METRICS: registry.register(getattr(self, metric_name)) + if self.cache_config_info is not None: + registry.register(self.cache_config_info) if workers == 1: registry.register(work_process_metrics.e2e_request_latency) registry.register(work_process_metrics.request_params_max_tokens) diff --git a/fastdeploy/output/token_processor.py b/fastdeploy/output/token_processor.py index c5511104f..a73a1634b 100644 --- a/fastdeploy/output/token_processor.py +++ b/fastdeploy/output/token_processor.py @@ -284,6 +284,7 @@ class TokenProcessor: main_process_metrics.batch_size.set( self.resource_manager.max_num_seqs - self.resource_manager.available_batch() ) + main_process_metrics.available_batch_size.set(self.resource_manager.available_batch()) if task_id in self.tokens_counter: del self.tokens_counter[task_id] diff --git a/requirements.txt b/requirements.txt index 0e0d5ca6f..c4a96a30e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,7 +10,7 @@ tqdm pynvml uvicorn==0.29.0 fastapi -paddleformers +paddleformers==0.1.2 redis etcd3 httpx diff --git a/scripts/run_ci_xpu.sh b/scripts/run_ci_xpu.sh index cb3ad94c1..dd79d55cf 100644 --- a/scripts/run_ci_xpu.sh +++ b/scripts/run_ci_xpu.sh @@ -16,7 +16,7 @@ python -m pip install -r requirements.txt echo "uninstall org" python -m pip uninstall paddlepaddle-xpu -y python -m pip uninstall fastdeploy-xpu -y -python -m pip install paddlepaddle-xpu -i https://www.paddlepaddle.org.cn/packages/stable/xpu-p800/ +python -m pip install paddlepaddle-xpu==3.1.1 -i https://www.paddlepaddle.org.cn/packages/stable/xpu-p800/ echo "build whl" bash build.sh || exit 1 echo "pip others" diff --git a/test/ci_use/Qwen2-7B-Instruct_serving/test_Qwen2-7B-Instruct_serving.py b/test/ci_use/Qwen2-7B-Instruct_serving/test_Qwen2-7B-Instruct_serving.py index 5898d332f..03d0dee6f 100644 --- a/test/ci_use/Qwen2-7B-Instruct_serving/test_Qwen2-7B-Instruct_serving.py +++ b/test/ci_use/Qwen2-7B-Instruct_serving/test_Qwen2-7B-Instruct_serving.py @@ -417,6 +417,12 @@ def test_metrics_endpoint(metrics_url): gpu_cache_usage_perc_found = False request_params_max_tokens_sum_found = False request_success_total_found = False + cache_config_info_found = False + available_batch_size_found = False + hit_req_rate_found = False + hit_token_rate_found = False + cpu_hit_token_rate_found = False + gpu_hit_token_rate_found = False for line in metric_lines: if line.startswith("fastdeploy:num_requests_running"): @@ -483,7 +489,30 @@ def test_metrics_endpoint(metrics_url): _, value = line.rsplit(" ", 1) assert float(value) >= 0, "request_success_total 值错误" request_success_total_found = True - + elif line.startswith("fastdeploy:cache_config_info"): + _, value = line.rsplit(" ", 1) + assert float(value) >= 0, "cache_config_info 值错误" + cache_config_info_found = True + elif line.startswith("fastdeploy:available_batch_size"): + _, value = line.rsplit(" ", 1) + assert float(value) >= 0, "available_batch_size 值错误" + available_batch_size_found = True + elif line.startswith("fastdeploy:hit_req_rate"): + _, value = line.rsplit(" ", 1) + assert float(value) >= 0, "hit_req_rate 值错误" + hit_req_rate_found = True + elif line.startswith("fastdeploy:hit_token_rate"): + _, value = line.rsplit(" ", 1) + assert float(value) >= 0, "hit_token_rate 值错误" + hit_token_rate_found = True + elif line.startswith("fastdeploy:cpu_hit_token_rate"): + _, value = line.rsplit(" ", 1) + assert float(value) >= 0, "cpu_hit_token_rate 值错误" + cpu_hit_token_rate_found = True + elif line.startswith("fastdeploy:gpu_hit_token_rate"): + _, value = line.rsplit(" ", 1) + assert float(value) >= 0, "gpu_hit_token_rate 值错误" + gpu_hit_token_rate_found = True assert num_requests_running_found, "缺少 fastdeploy:num_requests_running 指标" assert num_requests_waiting_found, "缺少 fastdeploy:num_requests_waiting 指标" assert time_to_first_token_seconds_sum_found, "缺少 fastdeploy:time_to_first_token_seconds_sum 指标" @@ -500,6 +529,12 @@ def test_metrics_endpoint(metrics_url): assert gpu_cache_usage_perc_found, "缺少 fastdeploy:gpu_cache_usage_perc 指标" assert request_params_max_tokens_sum_found, "缺少 fastdeploy:request_params_max_tokens_sum 指标" assert request_success_total_found, "缺少 fastdeploy:request_success_total 指标" + assert cache_config_info_found, "缺少 fastdeploy:cache_config_info 指标" + assert available_batch_size_found, "缺少 fastdeploy:available_batch_size 指标" + assert hit_req_rate_found, "缺少 fastdeploy:hit_req_rate 指标" + assert hit_token_rate_found, "缺少 fastdeploy:hit_token_rate 指标" + assert cpu_hit_token_rate_found, "缺少 fastdeploy:hit_token_rate 指标" + assert gpu_hit_token_rate_found, "缺少 fastdeploy:gpu_hit_token_rate 指标" # ========================== diff --git a/tests/e2e/test_Qwen2-7B-Instruct_serving.py b/tests/e2e/test_Qwen2-7B-Instruct_serving.py new file mode 100644 index 000000000..1c8c0ca31 --- /dev/null +++ b/tests/e2e/test_Qwen2-7B-Instruct_serving.py @@ -0,0 +1,818 @@ +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import concurrent.futures +import json +import os +import re +import shutil +import signal +import socket +import subprocess +import sys +import time + +import openai +import pytest +import requests +from jsonschema import validate + +# Read ports from environment variables; use default values if not set +FD_API_PORT = int(os.getenv("FD_API_PORT", 8188)) +FD_ENGINE_QUEUE_PORT = int(os.getenv("FD_ENGINE_QUEUE_PORT", 8133)) +FD_METRICS_PORT = int(os.getenv("FD_METRICS_PORT", 8233)) +FD_CACHE_QUEUE_PORT = int(os.getenv("FD_CACHE_QUEUE_PORT", 8333)) + +# List of ports to clean before and after tests +PORTS_TO_CLEAN = [FD_API_PORT, FD_ENGINE_QUEUE_PORT, FD_METRICS_PORT, FD_CACHE_QUEUE_PORT] + + +def is_port_open(host: str, port: int, timeout=1.0): + """ + Check if a TCP port is open on the given host. + Returns True if connection succeeds, False otherwise. + """ + try: + with socket.create_connection((host, port), timeout): + return True + except Exception: + return False + + +def kill_process_on_port(port: int): + """ + Kill processes that are listening on the given port. + Uses `lsof` to find process ids and sends SIGKILL. + """ + try: + output = subprocess.check_output(f"lsof -i:{port} -t", shell=True).decode().strip() + current_pid = os.getpid() + parent_pid = os.getppid() + for pid in output.splitlines(): + pid = int(pid) + if pid in (current_pid, parent_pid): + print(f"Skip killing current process (pid={pid}) on port {port}") + continue + os.kill(pid, signal.SIGKILL) + print(f"Killed process on port {port}, pid={pid}") + except subprocess.CalledProcessError: + pass + + +def clean_ports(): + """ + Kill all processes occupying the ports listed in PORTS_TO_CLEAN. + """ + for port in PORTS_TO_CLEAN: + kill_process_on_port(port) + time.sleep(2) + + +@pytest.fixture(scope="session", autouse=True) +def setup_and_run_server(): + """ + Pytest fixture that runs once per test session: + - Cleans ports before tests + - Starts the API server as a subprocess + - Waits for server port to open (up to 30 seconds) + - Tears down server after all tests finish + """ + print("Pre-test port cleanup...") + clean_ports() + + print("log dir clean ") + if os.path.exists("log") and os.path.isdir("log"): + shutil.rmtree("log") + + base_path = os.getenv("MODEL_PATH") + if base_path: + model_path = os.path.join(base_path, "Qwen2-7B-Instruct") + else: + model_path = "./Qwen2-7B-Instruct" + + log_path = "server.log" + cmd = [ + sys.executable, + "-m", + "fastdeploy.entrypoints.openai.api_server", + "--model", + model_path, + "--port", + str(FD_API_PORT), + "--tensor-parallel-size", + "1", + "--engine-worker-queue-port", + str(FD_ENGINE_QUEUE_PORT), + "--metrics-port", + str(FD_METRICS_PORT), + "--cache-queue-port", + str(FD_CACHE_QUEUE_PORT), + "--max-model-len", + "32768", + "--max-num-seqs", + "128", + "--quantization", + "wint8", + ] + + # Start subprocess in new process group + with open(log_path, "w") as logfile: + process = subprocess.Popen( + cmd, + stdout=logfile, + stderr=subprocess.STDOUT, + start_new_session=True, # Enables killing full group via os.killpg + ) + + # Wait up to 300 seconds for API server to be ready + for _ in range(300): + if is_port_open("127.0.0.1", FD_API_PORT): + print(f"API server is up on port {FD_API_PORT}") + break + time.sleep(1) + else: + print("[TIMEOUT] API server failed to start in 5 minutes. Cleaning up...") + try: + os.killpg(process.pid, signal.SIGTERM) + except Exception as e: + print(f"Failed to kill process group: {e}") + raise RuntimeError(f"API server did not start on port {FD_API_PORT}") + + yield # Run tests + + print("\n===== Post-test server cleanup... =====") + try: + os.killpg(process.pid, signal.SIGTERM) + clean_ports() + print(f"API server (pid={process.pid}) terminated") + except Exception as e: + print(f"Failed to terminate API server: {e}") + + +@pytest.fixture(scope="session") +def api_url(request): + """ + Returns the API endpoint URL for chat completions. + """ + return f"http://0.0.0.0:{FD_API_PORT}/v1/chat/completions" + + +@pytest.fixture(scope="session") +def metrics_url(request): + """ + Returns the metrics endpoint URL. + """ + return f"http://0.0.0.0:{FD_METRICS_PORT}/metrics" + + +@pytest.fixture +def headers(): + """ + Returns common HTTP request headers. + """ + return {"Content-Type": "application/json"} + + +@pytest.fixture +def consistent_payload(): + """ + Returns a fixed payload for consistency testing, + including a fixed random seed and temperature. + """ + return { + "messages": [{"role": "user", "content": "用一句话介绍 PaddlePaddle"}], + "temperature": 0.9, + "top_p": 0, # fix top_p to reduce randomness + "seed": 13, # fixed random seed + } + + +# ========================== +# JSON Schema for validating chat API responses +# ========================== +chat_response_schema = { + "type": "object", + "properties": { + "id": {"type": "string"}, + "object": {"type": "string"}, + "created": {"type": "number"}, + "model": {"type": "string"}, + "choices": { + "type": "array", + "items": { + "type": "object", + "properties": { + "message": { + "type": "object", + "properties": { + "role": {"type": "string"}, + "content": {"type": "string"}, + }, + "required": ["role", "content"], + }, + "index": {"type": "number"}, + "finish_reason": {"type": "string"}, + }, + "required": ["message", "index", "finish_reason"], + }, + }, + }, + "required": ["id", "object", "created", "model", "choices"], +} + + +# ========================== +# Helper function to calculate difference rate between two texts +# ========================== +def calculate_diff_rate(text1, text2): + """ + Calculate the difference rate between two strings + based on the normalized Levenshtein edit distance. + Returns a float in [0,1], where 0 means identical. + """ + if text1 == text2: + return 0.0 + + len1, len2 = len(text1), len(text2) + dp = [[0] * (len2 + 1) for _ in range(len1 + 1)] + + for i in range(len1 + 1): + for j in range(len2 + 1): + if i == 0 or j == 0: + dp[i][j] = i + j + elif text1[i - 1] == text2[j - 1]: + dp[i][j] = dp[i - 1][j - 1] + else: + dp[i][j] = 1 + min(dp[i - 1][j], dp[i][j - 1], dp[i - 1][j - 1]) + + edit_distance = dp[len1][len2] + max_len = max(len1, len2) + return edit_distance / max_len if max_len > 0 else 0.0 + + +# ========================== +# Valid prompt test cases for parameterized testing +# ========================== +valid_prompts = [ + [{"role": "user", "content": "你好"}], + [{"role": "user", "content": "用一句话介绍 FastDeploy"}], +] + + +@pytest.mark.parametrize("messages", valid_prompts) +def test_valid_chat(messages, api_url, headers): + """ + Test valid chat requests. + """ + resp = requests.post(api_url, headers=headers, json={"messages": messages}) + + assert resp.status_code == 200 + validate(instance=resp.json(), schema=chat_response_schema) + + +# ========================== +# Consistency test for repeated runs with fixed payload +# ========================== +def test_consistency_between_runs(api_url, headers, consistent_payload): + """ + Test that two runs with the same fixed input produce similar outputs. + """ + # First request + resp1 = requests.post(api_url, headers=headers, json=consistent_payload) + assert resp1.status_code == 200 + result1 = resp1.json() + content1 = result1["choices"][0]["message"]["content"] + + # Second request + resp2 = requests.post(api_url, headers=headers, json=consistent_payload) + assert resp2.status_code == 200 + result2 = resp2.json() + content2 = result2["choices"][0]["message"]["content"] + + # Calculate difference rate + diff_rate = calculate_diff_rate(content1, content2) + + # Verify that the difference rate is below the threshold + assert diff_rate < 0.05, f"Output difference too large ({diff_rate:.4%})" + + +# ========================== +# Invalid prompt tests +# ========================== + +invalid_prompts = [ + [], # Empty array + [{}], # Empty object + [{"role": "user"}], # Missing content + [{"content": "hello"}], # Missing role +] + + +@pytest.mark.parametrize("messages", invalid_prompts) +def test_invalid_chat(messages, api_url, headers): + """ + Test invalid chat inputs + """ + resp = requests.post(api_url, headers=headers, json={"messages": messages}) + assert resp.status_code >= 400, "Invalid request should return an error status code" + + +# ========================== +# Test for input exceeding context length +# ========================== + + +def test_exceed_context_length(api_url, headers): + """ + Test case for inputs that exceed the model's maximum context length. + """ + # Construct an overly long message + long_content = "你好," * 20000 + + messages = [{"role": "user", "content": long_content}] + + resp = requests.post(api_url, headers=headers, json={"messages": messages}) + + # Check if the response indicates a token limit error or server error (500) + try: + response_json = resp.json() + except Exception: + response_json = {} + + # Check status code and response content + assert ( + resp.status_code != 200 or "token" in json.dumps(response_json).lower() + ), f"Expected token limit error or similar, but got a normal response: {response_json}" + + +# ========================== +# Multi-turn Conversation Test +# ========================== +def test_multi_turn_conversation(api_url, headers): + """ + Test whether multi-turn conversation context is effective. + """ + messages = [ + {"role": "user", "content": "你是谁?"}, + {"role": "assistant", "content": "我是AI助手"}, + {"role": "user", "content": "你能做什么?"}, + ] + resp = requests.post(api_url, headers=headers, json={"messages": messages}) + assert resp.status_code == 200 + validate(instance=resp.json(), schema=chat_response_schema) + + +# ========================== +# Concurrent Performance Test +# ========================== +def test_concurrent_perf(api_url, headers): + """ + Send concurrent requests to test stability and response time. + """ + prompts = [{"role": "user", "content": "Introduce FastDeploy."}] + + def send_request(): + """ + Send a single request + """ + resp = requests.post(api_url, headers=headers, json={"messages": prompts}) + assert resp.status_code == 200 + return resp.elapsed.total_seconds() + + with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor: + futures = [executor.submit(send_request) for _ in range(8)] + durations = [f.result() for f in futures] + + print("\nResponse time for each request:", durations) + + +# ========================== +# Metrics Endpoint Test +# ========================== + + +def test_metrics_endpoint(metrics_url): + """ + Test the metrics monitoring endpoint. + """ + resp = requests.get(metrics_url, timeout=5) + + assert resp.status_code == 200, f"Unexpected status code: {resp.status_code}" + assert "text/plain" in resp.headers["Content-Type"], "Content-Type is not text/plain" + + # Parse Prometheus metrics data + metrics_data = resp.text + lines = metrics_data.split("\n") + + metric_lines = [line for line in lines if not line.startswith("#") and line.strip() != ""] + + # 断言 具体值 + num_requests_running_found = False + num_requests_waiting_found = False + time_to_first_token_seconds_sum_found = False + time_per_output_token_seconds_sum_found = False + e2e_request_latency_seconds_sum_found = False + request_inference_time_seconds_sum_found = False + request_queue_time_seconds_sum_found = False + request_prefill_time_seconds_sum_found = False + request_decode_time_seconds_sum_found = False + prompt_tokens_total_found = False + generation_tokens_total_found = False + request_prompt_tokens_sum_found = False + request_generation_tokens_sum_found = False + gpu_cache_usage_perc_found = False + request_params_max_tokens_sum_found = False + request_success_total_found = False + cache_config_info_found = False + available_batch_size_found = False + hit_req_rate_found = False + hit_token_rate_found = False + cpu_hit_token_rate_found = False + gpu_hit_token_rate_found = False + + for line in metric_lines: + if line.startswith("fastdeploy:num_requests_running"): + _, value = line.rsplit(" ", 1) + assert float(value) >= 0, "num_requests_running 值错误" + num_requests_running_found = True + elif line.startswith("fastdeploy:num_requests_waiting"): + _, value = line.rsplit(" ", 1) + num_requests_waiting_found = True + assert float(value) >= 0, "num_requests_waiting 值错误" + elif line.startswith("fastdeploy:time_to_first_token_seconds_sum"): + _, value = line.rsplit(" ", 1) + assert float(value) >= 0, "time_to_first_token_seconds_sum 值错误" + time_to_first_token_seconds_sum_found = True + elif line.startswith("fastdeploy:time_per_output_token_seconds_sum"): + _, value = line.rsplit(" ", 1) + assert float(value) >= 0, "time_per_output_token_seconds_sum 值错误" + time_per_output_token_seconds_sum_found = True + elif line.startswith("fastdeploy:e2e_request_latency_seconds_sum"): + _, value = line.rsplit(" ", 1) + assert float(value) >= 0, "e2e_request_latency_seconds_sum_found 值错误" + e2e_request_latency_seconds_sum_found = True + elif line.startswith("fastdeploy:request_inference_time_seconds_sum"): + _, value = line.rsplit(" ", 1) + assert float(value) >= 0, "request_inference_time_seconds_sum 值错误" + request_inference_time_seconds_sum_found = True + elif line.startswith("fastdeploy:request_queue_time_seconds_sum"): + _, value = line.rsplit(" ", 1) + assert float(value) >= 0, "request_queue_time_seconds_sum 值错误" + request_queue_time_seconds_sum_found = True + elif line.startswith("fastdeploy:request_prefill_time_seconds_sum"): + _, value = line.rsplit(" ", 1) + assert float(value) >= 0, "request_prefill_time_seconds_sum 值错误" + request_prefill_time_seconds_sum_found = True + elif line.startswith("fastdeploy:request_decode_time_seconds_sum"): + _, value = line.rsplit(" ", 1) + assert float(value) >= 0, "request_decode_time_seconds_sum 值错误" + request_decode_time_seconds_sum_found = True + elif line.startswith("fastdeploy:prompt_tokens_total"): + _, value = line.rsplit(" ", 1) + assert float(value) >= 0, "prompt_tokens_total 值错误" + prompt_tokens_total_found = True + elif line.startswith("fastdeploy:generation_tokens_total"): + _, value = line.rsplit(" ", 1) + assert float(value) >= 0, "generation_tokens_total 值错误" + generation_tokens_total_found = True + elif line.startswith("fastdeploy:request_prompt_tokens_sum"): + _, value = line.rsplit(" ", 1) + assert float(value) >= 0, "request_prompt_tokens_sum 值错误" + request_prompt_tokens_sum_found = True + elif line.startswith("fastdeploy:request_generation_tokens_sum"): + _, value = line.rsplit(" ", 1) + assert float(value) >= 0, "request_generation_tokens_sum 值错误" + request_generation_tokens_sum_found = True + elif line.startswith("fastdeploy:gpu_cache_usage_perc"): + _, value = line.rsplit(" ", 1) + assert float(value) >= 0, "gpu_cache_usage_perc 值错误" + gpu_cache_usage_perc_found = True + elif line.startswith("fastdeploy:request_params_max_tokens_sum"): + _, value = line.rsplit(" ", 1) + assert float(value) >= 0, "request_params_max_tokens_sum 值错误" + request_params_max_tokens_sum_found = True + elif line.startswith("fastdeploy:request_success_total"): + _, value = line.rsplit(" ", 1) + assert float(value) >= 0, "request_success_total 值错误" + request_success_total_found = True + elif line.startswith("fastdeploy:cache_config_info"): + _, value = line.rsplit(" ", 1) + assert float(value) >= 0, "cache_config_info 值错误" + cache_config_info_found = True + elif line.startswith("fastdeploy:available_batch_size"): + _, value = line.rsplit(" ", 1) + assert float(value) >= 0, "available_batch_size 值错误" + available_batch_size_found = True + elif line.startswith("fastdeploy:hit_req_rate"): + _, value = line.rsplit(" ", 1) + assert float(value) >= 0, "hit_req_rate 值错误" + hit_req_rate_found = True + elif line.startswith("fastdeploy:hit_token_rate"): + _, value = line.rsplit(" ", 1) + assert float(value) >= 0, "hit_token_rate 值错误" + hit_token_rate_found = True + elif line.startswith("fastdeploy:cpu_hit_token_rate"): + _, value = line.rsplit(" ", 1) + assert float(value) >= 0, "cpu_hit_token_rate 值错误" + cpu_hit_token_rate_found = True + elif line.startswith("fastdeploy:gpu_hit_token_rate"): + _, value = line.rsplit(" ", 1) + assert float(value) >= 0, "gpu_hit_token_rate 值错误" + gpu_hit_token_rate_found = True + assert num_requests_running_found, "缺少 fastdeploy:num_requests_running 指标" + assert num_requests_waiting_found, "缺少 fastdeploy:num_requests_waiting 指标" + assert time_to_first_token_seconds_sum_found, "缺少 fastdeploy:time_to_first_token_seconds_sum 指标" + assert time_per_output_token_seconds_sum_found, "缺少 fastdeploy:time_per_output_token_seconds_sum 指标" + assert e2e_request_latency_seconds_sum_found, "缺少 fastdeploy:e2e_request_latency_seconds_sum_found 指标" + assert request_inference_time_seconds_sum_found, "缺少 fastdeploy:request_inference_time_seconds_sum 指标" + assert request_queue_time_seconds_sum_found, "缺少 fastdeploy:request_queue_time_seconds_sum 指标" + assert request_prefill_time_seconds_sum_found, "缺少 fastdeploy:request_prefill_time_seconds_sum 指标" + assert request_decode_time_seconds_sum_found, "缺少 fastdeploy:request_decode_time_seconds_sum 指标" + assert prompt_tokens_total_found, "缺少 fastdeploy:prompt_tokens_total 指标" + assert generation_tokens_total_found, "缺少 fastdeploy:generation_tokens_total 指标" + assert request_prompt_tokens_sum_found, "缺少 fastdeploy:request_prompt_tokens_sum 指标" + assert request_generation_tokens_sum_found, "缺少 fastdeploy:request_generation_tokens_sum 指标" + assert gpu_cache_usage_perc_found, "缺少 fastdeploy:gpu_cache_usage_perc 指标" + assert request_params_max_tokens_sum_found, "缺少 fastdeploy:request_params_max_tokens_sum 指标" + assert request_success_total_found, "缺少 fastdeploy:request_success_total 指标" + assert cache_config_info_found, "缺少 fastdeploy:cache_config_info 指标" + assert available_batch_size_found, "缺少 fastdeploy:available_batch_size 指标" + assert hit_req_rate_found, "缺少 fastdeploy:hit_req_rate 指标" + assert hit_token_rate_found, "缺少 fastdeploy:hit_token_rate 指标" + assert cpu_hit_token_rate_found, "缺少 fastdeploy:hit_token_rate 指标" + assert gpu_hit_token_rate_found, "缺少 fastdeploy:gpu_hit_token_rate 指标" + + +# ========================== +# OpenAI Client chat.completions Test +# ========================== + + +@pytest.fixture +def openai_client(): + ip = "0.0.0.0" + service_http_port = str(FD_API_PORT) + client = openai.Client( + base_url=f"http://{ip}:{service_http_port}/v1", + api_key="EMPTY_API_KEY", + ) + return client + + +# Non-streaming test +def test_non_streaming_chat(openai_client): + """Test non-streaming chat functionality with the local service""" + response = openai_client.chat.completions.create( + model="default", + messages=[ + {"role": "system", "content": "You are a helpful AI assistant."}, + {"role": "user", "content": "List 3 countries and their capitals."}, + ], + temperature=1, + max_tokens=1024, + stream=False, + ) + + assert hasattr(response, "choices") + assert len(response.choices) > 0 + assert hasattr(response.choices[0], "message") + assert hasattr(response.choices[0].message, "content") + + +# Streaming test +def test_streaming_chat(openai_client, capsys): + """Test streaming chat functionality with the local service""" + response = openai_client.chat.completions.create( + model="default", + messages=[ + {"role": "system", "content": "You are a helpful AI assistant."}, + {"role": "user", "content": "List 3 countries and their capitals."}, + { + "role": "assistant", + "content": "China(Beijing), France(Paris), Australia(Canberra).", + }, + {"role": "user", "content": "OK, tell more."}, + ], + temperature=1, + max_tokens=1024, + stream=True, + ) + + output = [] + for chunk in response: + if hasattr(chunk.choices[0], "delta") and hasattr(chunk.choices[0].delta, "content"): + output.append(chunk.choices[0].delta.content) + assert len(output) > 2 + + +# ========================== +# OpenAI Client completions Test +# ========================== + + +def test_non_streaming(openai_client): + """Test non-streaming chat functionality with the local service""" + response = openai_client.completions.create( + model="default", + prompt="Hello, how are you?", + temperature=1, + max_tokens=1024, + stream=False, + ) + + # Assertions to check the response structure + assert hasattr(response, "choices") + assert len(response.choices) > 0 + + +def test_streaming(openai_client, capsys): + """Test streaming functionality with the local service""" + response = openai_client.completions.create( + model="default", + prompt="Hello, how are you?", + temperature=1, + max_tokens=1024, + stream=True, + ) + + # Collect streaming output + output = [] + for chunk in response: + output.append(chunk.choices[0].text) + assert len(output) > 0 + + +def test_profile_reset_block_num(): + """测试profile reset_block_num功能,与baseline diff不能超过5%""" + log_file = "./log/config.log" + baseline = 32562 + + if not os.path.exists(log_file): + pytest.fail(f"Log file not found: {log_file}") + + with open(log_file, "r") as f: + log_lines = f.readlines() + + target_line = None + for line in log_lines: + if "Reset block num" in line: + target_line = line.strip() + break + + if target_line is None: + pytest.fail("日志中没有Reset block num信息") + + match = re.search(r"total_block_num:(\d+)", target_line) + if not match: + pytest.fail(f"Failed to extract total_block_num from line: {target_line}") + + try: + actual_value = int(match.group(1)) + except ValueError: + pytest.fail(f"Invalid number format: {match.group(1)}") + + lower_bound = baseline * (1 - 0.05) + upper_bound = baseline * (1 + 0.05) + print(f"Reset total_block_num: {actual_value}. baseline: {baseline}") + + assert lower_bound <= actual_value <= upper_bound, ( + f"Reset total_block_num {actual_value} 与 baseline {baseline} diff需要在5%以内" + f"Allowed range: [{lower_bound:.1f}, {upper_bound:.1f}]" + ) + + +def test_prompt_token_ids_in_non_streaming_completion(openai_client): + """ + Test cases for passing token ids through `prompt`/`prompt_token_ids` in non-streaming completion api + """ + # Test case for passing a token id list in `prompt_token_ids` + response = openai_client.completions.create( + model="default", + prompt="", + temperature=1, + max_tokens=5, + extra_body={"prompt_token_ids": [5209, 626, 274, 45954, 1071, 3265, 3934, 1869, 93937]}, + stream=False, + ) + assert len(response.choices) == 1 + assert response.usage.prompt_tokens == 9 + + # Test case for passing a batch of token id lists in `prompt_token_ids` + response = openai_client.completions.create( + model="default", + prompt="", + temperature=1, + max_tokens=5, + extra_body={"prompt_token_ids": [[5209, 626, 274, 45954, 1071, 3265, 3934, 1869, 93937], [1, 2, 3]]}, + stream=False, + ) + assert len(response.choices) == 2 + assert response.usage.prompt_tokens == 9 + 3 + + # Test case for passing a token id list in `prompt` + response = openai_client.completions.create( + model="default", + prompt=[5209, 626, 274, 45954, 1071, 3265, 3934, 1869, 93937], + temperature=1, + max_tokens=5, + stream=False, + ) + assert len(response.choices) == 1 + assert response.usage.prompt_tokens == 9 + + # Test case for passing a batch of token id lists in `prompt` + response = openai_client.completions.create( + model="default", + prompt=[[5209, 626, 274, 45954, 1071, 3265, 3934, 1869, 93937], [1, 2, 3]], + temperature=1, + max_tokens=5, + stream=False, + ) + assert len(response.choices) == 2 + assert response.usage.prompt_tokens == 9 + 3 + + +def test_prompt_token_ids_in_streaming_completion(openai_client): + """ + Test cases for passing token ids through `prompt`/`prompt_token_ids` in streaming completion api + """ + # Test case for passing a token id list in `prompt_token_ids` + response = openai_client.completions.create( + model="default", + prompt="", + temperature=1, + max_tokens=5, + extra_body={"prompt_token_ids": [5209, 626, 274, 45954, 1071, 3265, 3934, 1869, 93937]}, + stream=True, + stream_options={"include_usage": True}, + ) + sum_prompt_tokens = 0 + for chunk in response: + if len(chunk.choices) > 0: + assert chunk.usage is None + else: + sum_prompt_tokens += chunk.usage.prompt_tokens + assert sum_prompt_tokens == 9 + + # Test case for passing a batch of token id lists in `prompt_token_ids` + response = openai_client.completions.create( + model="default", + prompt="", + temperature=1, + max_tokens=5, + extra_body={"prompt_token_ids": [[5209, 626, 274, 45954, 1071, 3265, 3934, 1869, 93937], [1, 2, 3]]}, + stream=True, + stream_options={"include_usage": True}, + ) + sum_prompt_tokens = 0 + for chunk in response: + if len(chunk.choices) > 0: + assert chunk.usage is None + else: + sum_prompt_tokens += chunk.usage.prompt_tokens + assert sum_prompt_tokens == 9 + 3 + + # Test case for passing a token id list in `prompt` + response = openai_client.completions.create( + model="default", + prompt=[5209, 626, 274, 45954, 1071, 3265, 3934, 1869, 93937], + temperature=1, + max_tokens=5, + stream=True, + stream_options={"include_usage": True}, + ) + sum_prompt_tokens = 0 + for chunk in response: + if len(chunk.choices) > 0: + assert chunk.usage is None + else: + sum_prompt_tokens += chunk.usage.prompt_tokens + assert sum_prompt_tokens == 9 + + # Test case for passing a batch of token id lists in `prompt` + response = openai_client.completions.create( + model="default", + prompt=[[5209, 626, 274, 45954, 1071, 3265, 3934, 1869, 93937], [1, 2, 3]], + temperature=1, + max_tokens=5, + stream=True, + stream_options={"include_usage": True}, + ) + sum_prompt_tokens = 0 + for chunk in response: + if len(chunk.choices) > 0: + assert chunk.usage is None + else: + sum_prompt_tokens += chunk.usage.prompt_tokens + assert sum_prompt_tokens == 9 + 3 diff --git a/tests/metrics/test_new_metrics.py b/tests/metrics/test_new_metrics.py new file mode 100644 index 000000000..bbcaba3fc --- /dev/null +++ b/tests/metrics/test_new_metrics.py @@ -0,0 +1,92 @@ +import unittest +from unittest.mock import MagicMock, patch + +from fastdeploy.cache_manager.cache_metrics import CacheMetrics +from fastdeploy.output.token_processor import TokenProcessor + + +class TestCoverageFix(unittest.TestCase): + @patch("fastdeploy.cache_manager.cache_metrics.main_process_metrics") + def test_cache_metrics_update_history(self, mock_main_process_metrics): + """ + 测试 CacheMetrics._update_history_hit_metrics 方法。 + + 目标:确保 main_process_metrics 的 .set() 方法被正确调用,覆盖第 58-61 行。 + """ + print("\nRunning test for CacheMetrics._update_history_hit_metrics...") + metrics = CacheMetrics() + + # 准备数据以避免除零错误 + metrics.req_count = 20 + metrics.hit_req_count = 10 + metrics.total_token_num = 1000 + metrics.total_cpu_matched_token_num = 250 + metrics.total_gpu_matched_token_num = 350 + metrics.matched_token_num = metrics.total_cpu_matched_token_num + metrics.total_gpu_matched_token_num + + # 调用目标方法 + metrics._update_history_hit_metrics() + + # 断言 Prometheus 指标的 set 方法是否被正确的值调用 + mock_main_process_metrics.hit_req_rate.set.assert_called_once_with(0.5) # 10 / 20 + mock_main_process_metrics.hit_token_rate.set.assert_called_once_with(0.6) # 600 / 1000 + mock_main_process_metrics.cpu_hit_token_rate.set.assert_called_once_with(0.25) # 250 / 1000 + mock_main_process_metrics.gpu_hit_token_rate.set.assert_called_once_with(0.35) # 350 / 1000 + + print("Test for CacheMetrics passed.") + + def setUp(self): + """为 TokenProcessor 测试设置通用的 mock 对象。""" + self.mock_cfg = MagicMock() + self.mock_cached_generated_tokens = MagicMock() + self.mock_engine_worker_queue = MagicMock() + self.mock_split_connector = MagicMock() + self.mock_resource_manager = MagicMock() + + with patch("fastdeploy.output.token_processor.IPCSignal"): + self.processor = TokenProcessor( + cfg=self.mock_cfg, + cached_generated_tokens=self.mock_cached_generated_tokens, + engine_worker_queue=self.mock_engine_worker_queue, + split_connector=self.mock_split_connector, + ) + self.processor.resource_manager = self.mock_resource_manager + + # 使用 patch 来模拟 token_processor 模块中引用的 main_process_metrics + @patch("fastdeploy.output.token_processor.main_process_metrics") + def test_recycle_resources_updates_metrics(self, mock_main_process_metrics): + """ + 测试 TokenProcessor._recycle_resources 方法。 + + 目标:确保 available_batch_size 等指标被更新,覆盖第 285 行左右的代码。 + """ + print("\nRunning test for TokenProcessor._recycle_resources (metric update)...") + + # 1. 准备测试数据和 mock 行为 + task_id = "request-456" + index = 0 + mock_task = MagicMock() + + # 配置 resource_manager 的 mock 返回值 + self.mock_resource_manager.available_batch.return_value = 8 + self.mock_resource_manager.total_block_number.return_value = 1024 + self.mock_resource_manager.max_num_seqs = 16 + + # _recycle_resources 方法内部会操作这些列表/字典 + self.mock_resource_manager.tasks_list = [mock_task] + self.mock_resource_manager.stop_flags = [False] + + # 为了避免 del self.tokens_counter[task_id] 抛出 KeyError + self.processor.tokens_counter[task_id] = 5 + + # 调用目标方法 + self.processor._recycle_resources(task_id=task_id, index=index, task=mock_task, result=None, is_prefill=False) + + # 核心断言:验证 available_batch_size 指标是否被正确设置 + mock_main_process_metrics.available_batch_size.set.assert_called_once_with(8) + + print("Test for TokenProcessor passed.") + + +if __name__ == "__main__": + unittest.main()