diff --git a/benchmarks/benchmark_serving.py b/benchmarks/benchmark_serving.py index fb13301c4..b5dff2029 100644 --- a/benchmarks/benchmark_serving.py +++ b/benchmarks/benchmark_serving.py @@ -982,7 +982,7 @@ def main(args: argparse.Namespace): if args.result_dir: file_name = os.path.join(args.result_dir, file_name) with open(file_name, "w", encoding="utf-8") as outfile: - json.dump(result_json, outfile) + json.dump(result_json, outfile, ensure_ascii=False) save_to_pytorch_benchmark_format(args, result_json, file_name) diff --git a/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py b/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py index eed16986a..dc47107e6 100644 --- a/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py +++ b/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py @@ -277,7 +277,7 @@ class Ernie4_5_VLMoE(nn.Layer): def forward(self, hidden_states: paddle.Tensor, vl_moe_meta: VLMoEMeta): if self.num_shared_experts > 0: shared_experts_out = self.shared_experts(hidden_states) - hidden_states, vl_moe_meta.text_input, vl_moe_meta.image_input = text_image_gather_scatter( + hidden_states, text_input, image_input = text_image_gather_scatter( hidden_states, vl_moe_meta.text_input, vl_moe_meta.image_input, @@ -286,8 +286,8 @@ class Ernie4_5_VLMoE(nn.Layer): vl_moe_meta.image_index, True, ) - text_out = self.text_fused_moe(vl_moe_meta.text_input) - image_out = self.image_fused_moe(vl_moe_meta.image_input) + text_out = self.text_fused_moe(text_input) + image_out = self.image_fused_moe(image_input) hidden_states, _, _ = text_image_gather_scatter( hidden_states, text_out, diff --git a/tests/e2e/test_EB_VL_Lite_sot_serving.py b/tests/e2e/test_EB_VL_Lite_sot_serving.py new file mode 100644 index 000000000..b13eb956a --- /dev/null +++ b/tests/e2e/test_EB_VL_Lite_sot_serving.py @@ -0,0 +1,450 @@ +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os +import shutil +import signal +import socket +import subprocess +import sys +import time + +import openai +import pytest + +# Read ports from environment variables; use default values if not set +FD_API_PORT = int(os.getenv("FD_API_PORT", 8188)) +FD_ENGINE_QUEUE_PORT = int(os.getenv("FD_ENGINE_QUEUE_PORT", 8133)) +FD_METRICS_PORT = int(os.getenv("FD_METRICS_PORT", 8233)) +FD_CACHE_QUEUE_PORT = int(os.getenv("FD_CACHE_QUEUE_PORT", 8333)) + +# List of ports to clean before and after tests +PORTS_TO_CLEAN = [FD_API_PORT, FD_ENGINE_QUEUE_PORT, FD_METRICS_PORT, FD_CACHE_QUEUE_PORT] + +os.environ["FD_USE_MACHETE"] = "0" + + +def is_port_open(host: str, port: int, timeout=1.0): + """ + Check if a TCP port is open on the given host. + Returns True if connection succeeds, False otherwise. + """ + try: + with socket.create_connection((host, port), timeout): + return True + except Exception: + return False + + +def kill_process_on_port(port: int): + """ + Kill processes that are listening on the given port. + Uses `lsof` to find process ids and sends SIGKILL. + """ + try: + output = subprocess.check_output(f"lsof -i:{port} -t", shell=True).decode().strip() + current_pid = os.getpid() + parent_pid = os.getppid() + for pid in output.splitlines(): + pid = int(pid) + if pid in (current_pid, parent_pid): + print(f"Skip killing current process (pid={pid}) on port {port}") + continue + os.kill(pid, signal.SIGKILL) + print(f"Killed process on port {port}, pid={pid}") + except subprocess.CalledProcessError: + pass + + +def clean_ports(): + """ + Kill all processes occupying the ports listed in PORTS_TO_CLEAN. + """ + for port in PORTS_TO_CLEAN: + kill_process_on_port(port) + time.sleep(2) + + +@pytest.fixture(scope="session", autouse=True) +def setup_and_run_server(): + """ + Pytest fixture that runs once per test session: + - Cleans ports before tests + - Starts the API server as a subprocess + - Waits for server port to open (up to 30 seconds) + - Tears down server after all tests finish + """ + print("Pre-test port cleanup...") + clean_ports() + print("log dir clean ") + if os.path.exists("log") and os.path.isdir("log"): + shutil.rmtree("log") + + base_path = os.getenv("MODEL_PATH") + if base_path: + model_path = os.path.join(base_path, "ernie-4_5-vl-28b-a3b-bf16-paddle") + else: + model_path = "./ernie-4_5-vl-28b-a3b-bf16-paddle" + + log_path = "server.log" + limit_mm_str = json.dumps({"image": 100, "video": 100}) + + cmd = [ + sys.executable, + "-m", + "fastdeploy.entrypoints.openai.api_server", + "--model", + model_path, + "--port", + str(FD_API_PORT), + "--tensor-parallel-size", + "2", + "--engine-worker-queue-port", + str(FD_ENGINE_QUEUE_PORT), + "--metrics-port", + str(FD_METRICS_PORT), + "--cache-queue-port", + str(FD_CACHE_QUEUE_PORT), + "--enable-mm", + "--max-model-len", + "8192", + "--max-num-batched-tokens", + "172", + "--max-num-seqs", + "64", + "--limit-mm-per-prompt", + limit_mm_str, + "--enable-chunked-prefill", + "--kv-cache-ratio", + "0.71", + "--quantization", + "wint4", + "--reasoning-parser", + "ernie-45-vl", + "--graph-optimization-config", + '{"graph_opt_level": 1, "use_cudagraph": true, "full_cuda_graph": false}', + ] + + # Start subprocess in new process group + with open(log_path, "w") as logfile: + process = subprocess.Popen( + cmd, + stdout=logfile, + stderr=subprocess.STDOUT, + start_new_session=True, # Enables killing full group via os.killpg + ) + + # Wait up to 10 minutes for API server to be ready + for _ in range(10 * 60): + if is_port_open("127.0.0.1", FD_API_PORT): + print(f"API server is up on port {FD_API_PORT}") + break + time.sleep(1) + else: + print("[TIMEOUT] API server failed to start in 5 minutes. Cleaning up...") + try: + os.killpg(process.pid, signal.SIGTERM) + except Exception as e: + print(f"Failed to kill process group: {e}") + raise RuntimeError(f"API server did not start on port {FD_API_PORT}") + + yield # Run tests + + print("\n===== Post-test server cleanup... =====") + try: + os.killpg(process.pid, signal.SIGTERM) + print(f"API server (pid={process.pid}) terminated") + clean_ports() + except Exception as e: + print(f"Failed to terminate API server: {e}") + + +# ========================== +# OpenAI Client additional chat/completions test +# ========================== +@pytest.fixture +def openai_client(): + ip = "0.0.0.0" + service_http_port = str(FD_API_PORT) + client = openai.Client( + base_url=f"http://{ip}:{service_http_port}/v1", + api_key="EMPTY_API_KEY", + ) + return client + + +def test_non_streaming_chat_with_return_token_ids(openai_client, capsys): + """ + Test return_token_ids option in non-streaming chat functionality with the local service + """ + # 设定 return_token_ids + response = openai_client.chat.completions.create( + model="default", + messages=[ + {"role": "system", "content": "You are a helpful AI assistant."}, # system不是必需,可选 + { + "role": "user", + "content": [ + { + "type": "image_url", + "image_url": { + "url": "https://paddlenlp.bj.bcebos.com/datasets/paddlemix/demo_images/example2.jpg", + "detail": "high", + }, + }, + {"type": "text", "text": "请描述图片内容"}, + ], + }, + ], + temperature=1, + max_tokens=53, + extra_body={"return_token_ids": True}, + stream=False, + ) + assert hasattr(response, "choices") + assert len(response.choices) > 0 + assert hasattr(response.choices[0], "message") + assert hasattr(response.choices[0].message, "prompt_token_ids") + assert isinstance(response.choices[0].message.prompt_token_ids, list) + assert hasattr(response.choices[0].message, "completion_token_ids") + assert isinstance(response.choices[0].message.completion_token_ids, list) + + # 不设定 return_token_ids + response = openai_client.chat.completions.create( + model="default", + messages=[ + {"role": "system", "content": "You are a helpful AI assistant."}, # system不是必需,可选 + { + "role": "user", + "content": [ + { + "type": "image_url", + "image_url": { + "url": "https://paddlenlp.bj.bcebos.com/datasets/paddlemix/demo_images/example2.jpg", + "detail": "high", + }, + }, + {"type": "text", "text": "请描述图片内容"}, + ], + }, + ], + temperature=1, + max_tokens=53, + extra_body={"return_token_ids": False}, + stream=False, + ) + assert hasattr(response, "choices") + assert len(response.choices) > 0 + assert hasattr(response.choices[0], "message") + assert hasattr(response.choices[0].message, "prompt_token_ids") + assert response.choices[0].message.prompt_token_ids is None + assert hasattr(response.choices[0].message, "completion_token_ids") + assert response.choices[0].message.completion_token_ids is None + + +def test_streaming_chat_with_return_token_ids(openai_client, capsys): + """ + Test return_token_ids option in streaming chat functionality with the local service + """ + # enable return_token_ids + response = openai_client.chat.completions.create( + model="default", + messages=[ + {"role": "system", "content": "You are a helpful AI assistant."}, # system不是必需,可选 + { + "role": "user", + "content": [ + { + "type": "image_url", + "image_url": { + "url": "https://paddlenlp.bj.bcebos.com/datasets/paddlemix/demo_images/example2.jpg", + "detail": "high", + }, + }, + {"type": "text", "text": "请描述图片内容"}, + ], + }, + ], + temperature=1, + max_tokens=53, + extra_body={"return_token_ids": True}, + stream=True, + ) + is_first_chunk = True + for chunk in response: + assert hasattr(chunk, "choices") + assert len(chunk.choices) > 0 + assert hasattr(chunk.choices[0], "delta") + assert hasattr(chunk.choices[0].delta, "prompt_token_ids") + assert hasattr(chunk.choices[0].delta, "completion_token_ids") + if is_first_chunk: + is_first_chunk = False + assert isinstance(chunk.choices[0].delta.prompt_token_ids, list) + assert chunk.choices[0].delta.completion_token_ids is None + else: + assert chunk.choices[0].delta.prompt_token_ids is None + assert isinstance(chunk.choices[0].delta.completion_token_ids, list) + + # disable return_token_ids + response = openai_client.chat.completions.create( + model="default", + messages=[ + {"role": "system", "content": "You are a helpful AI assistant."}, # system不是必需,可选 + { + "role": "user", + "content": [ + { + "type": "image_url", + "image_url": { + "url": "https://paddlenlp.bj.bcebos.com/datasets/paddlemix/demo_images/example2.jpg", + "detail": "high", + }, + }, + {"type": "text", "text": "请描述图片内容"}, + ], + }, + ], + temperature=1, + max_tokens=53, + extra_body={"return_token_ids": False}, + stream=True, + ) + for chunk in response: + assert hasattr(chunk, "choices") + assert len(chunk.choices) > 0 + assert hasattr(chunk.choices[0], "delta") + assert hasattr(chunk.choices[0].delta, "prompt_token_ids") + assert chunk.choices[0].delta.prompt_token_ids is None + assert hasattr(chunk.choices[0].delta, "completion_token_ids") + assert chunk.choices[0].delta.completion_token_ids is None + + +def test_chat_with_thinking(openai_client, capsys): + """ + Test enable_thinking & reasoning_max_tokens option in non-streaming chat functionality with the local service + """ + # enable thinking, non-streaming + response = openai_client.chat.completions.create( + model="default", + messages=[{"role": "user", "content": "Explain gravity in a way that a five-year-old child can understand."}], + temperature=1, + stream=False, + max_tokens=10, + extra_body={"chat_template_kwargs": {"enable_thinking": True}}, + ) + assert response.choices[0].message.reasoning_content is not None + + # disable thinking, non-streaming + response = openai_client.chat.completions.create( + model="default", + messages=[{"role": "user", "content": "Explain gravity in a way that a five-year-old child can understand."}], + temperature=1, + stream=False, + max_tokens=10, + extra_body={"chat_template_kwargs": {"enable_thinking": False}}, + ) + assert response.choices[0].message.reasoning_content is None + assert "" not in response.choices[0].message.content + + # test logic + reasoning_max_tokens = None + response = openai_client.chat.completions.create( + model="default", + messages=[{"role": "user", "content": "Explain gravity in a way that a five-year-old child can understand."}], + temperature=1, + stream=False, + max_tokens=20, + extra_body={ + "chat_template_kwargs": {"enable_thinking": True}, + "reasoning_max_tokens": reasoning_max_tokens, + }, + ) + assert response.choices[0].message.reasoning_content is not None + + # enable thinking, streaming + reasoning_max_tokens = 3 + response = openai_client.chat.completions.create( + model="default", + messages=[{"role": "user", "content": "Explain gravity in a way that a five-year-old child can understand."}], + temperature=1, + extra_body={ + "chat_template_kwargs": {"enable_thinking": True}, + "reasoning_max_tokens": reasoning_max_tokens, + "return_token_ids": True, + }, + stream=True, + max_tokens=10, + ) + completion_tokens = 1 + reasoning_tokens = 0 + total_tokens = 0 + for chunk_id, chunk in enumerate(response): + if chunk_id == 0: # the first chunk is an extra chunk + continue + delta_message = chunk.choices[0].delta + if delta_message.content != "" and delta_message.reasoning_content == "": + completion_tokens += len(delta_message.completion_token_ids) + elif delta_message.reasoning_content != "" and delta_message.content == "": + reasoning_tokens += len(delta_message.completion_token_ids) + total_tokens += len(delta_message.completion_token_ids) + assert completion_tokens + reasoning_tokens == total_tokens + assert reasoning_tokens <= reasoning_max_tokens + + +def test_thinking_logic_flag(openai_client, capsys): + """ + Test the interaction between token calculation logic and conditional thinking. + This test covers: + 1. Default max_tokens calculation when not provided. + 2. Capping of max_tokens when it exceeds model limits. + 3. Default reasoning_max_tokens calculation when not provided. + 4. Activation of thinking based on the final state of reasoning_max_tokens. + """ + + response_case_1 = openai_client.chat.completions.create( + model="default", + messages=[{"role": "user", "content": "Explain gravity briefly."}], + temperature=1, + stream=False, + extra_body={ + "chat_template_kwargs": {"enable_thinking": True}, + }, + ) + assert response_case_1.choices[0].message.reasoning_content is not None + + response_case_2 = openai_client.chat.completions.create( + model="default", + messages=[{"role": "user", "content": "Explain gravity in a way that a five-year-old child can understand."}], + temperature=1, + stream=False, + max_tokens=20, + extra_body={ + "chat_template_kwargs": {"enable_thinking": True}, + "reasoning_max_tokens": 5, + }, + ) + assert response_case_2.choices[0].message.reasoning_content is not None + + response_case_3 = openai_client.chat.completions.create( + model="default", + messages=[{"role": "user", "content": "Explain gravity in a way that a five-year-old child can understand."}], + temperature=1, + stream=False, + max_tokens=20, + extra_body={ + "chat_template_kwargs": {"enable_thinking": False}, + }, + ) + assert response_case_3.choices[0].message.reasoning_content is None