[CI]add clear to run-batch ci (#5307)
Some checks failed
CE Compile Job / ce_job_pre_check (push) Has been cancelled
CE Compile Job / print_ce_job_pre_check_outputs (push) Has been cancelled
CE Compile Job / FD-Clone-Linux (push) Has been cancelled
CE Compile Job / Show Code Archive Output (push) Has been cancelled
CE Compile Job / BUILD_SM8090 (push) Has been cancelled
CE Compile Job / BUILD_SM8689 (push) Has been cancelled
CE Compile Job / CE_UPLOAD (push) Has been cancelled
Deploy GitHub Pages / deploy (push) Has been cancelled
Publish Job / publish_pre_check (push) Has been cancelled
Publish Job / print_publish_pre_check_outputs (push) Has been cancelled
Publish Job / FD-Clone-Linux (push) Has been cancelled
Publish Job / Show Code Archive Output (push) Has been cancelled
Publish Job / BUILD_SM8090 (push) Has been cancelled
Publish Job / BUILD_SM8689 (push) Has been cancelled
Publish Job / PADDLE_PYPI_UPLOAD_8090 (push) Has been cancelled
Publish Job / PADDLE_PYPI_UPLOAD_8689 (push) Has been cancelled
Publish Job / Run FD Image Build (push) Has been cancelled
Publish Job / Run FastDeploy Unit Tests and Coverage (push) Has been cancelled
Publish Job / Run FastDeploy LogProb Tests (push) Has been cancelled
Publish Job / Extracted partial CE model tasks to run in CI. (push) Has been cancelled
Publish Job / Run Base Tests (push) Has been cancelled
Publish Job / Run Accuracy Tests (push) Has been cancelled
Publish Job / Run Stable Tests (push) Has been cancelled
CI Images Build / FD-Clone-Linux (push) Has been cancelled
CI Images Build / Show Code Archive Output (push) Has been cancelled
CI Images Build / CI Images Build (push) Has been cancelled
CI Images Build / BUILD_SM8090 (push) Has been cancelled
CI Images Build / Run FastDeploy Unit Tests and Coverage (push) Has been cancelled
CI Images Build / Run FastDeploy LogProb Tests (push) Has been cancelled
CI Images Build / Extracted partial CE model tasks to run in CI. (push) Has been cancelled
CI Images Build / Run Base Tests (push) Has been cancelled
CI Images Build / Publish Docker Images Pre Check (push) Has been cancelled

This commit is contained in:
xiaolei373
2025-12-01 21:18:19 +08:00
committed by GitHub
parent aa35ce449d
commit 84e2f6aa75

View File

@@ -1,8 +1,13 @@
import asyncio
import json
import os
import shutil
import signal
import socket
import subprocess
import sys
import tempfile
import time
import unittest
from http import HTTPStatus
from unittest.mock import AsyncMock, MagicMock, Mock, mock_open, patch
@@ -42,6 +47,125 @@ from fastdeploy.entrypoints.openai.run_batch import (
write_local_file,
)
# Read ports from environment variables; use default values if not set
FD_API_PORT = int(os.getenv("FD_API_PORT", 8188))
FD_ENGINE_QUEUE_PORT = int(os.getenv("FD_ENGINE_QUEUE_PORT", 8133))
FD_METRICS_PORT = int(os.getenv("FD_METRICS_PORT", 8233))
FD_CACHE_QUEUE_PORT = int(os.getenv("FD_CACHE_QUEUE_PORT", 8333))
# List of ports to clean before and after tests
PORTS_TO_CLEAN = [FD_API_PORT, FD_ENGINE_QUEUE_PORT, FD_METRICS_PORT, FD_CACHE_QUEUE_PORT]
def is_port_open(host: str, port: int, timeout=1.0):
"""
Check if a TCP port is open on the given host.
Returns True if connection succeeds, False otherwise.
"""
try:
with socket.create_connection((host, port), timeout):
return True
except Exception:
return False
def _clean_cuda_process():
"""
Kill processes that are using CUDA devices.
NOTE: Do not call this function directly, use the `clean` function instead.
"""
try:
subprocess.run("fuser -k /dev/nvidia*", shell=True, timeout=5)
except (subprocess.TimeoutExpired, subprocess.CalledProcessError, FileNotFoundError):
pass
def kill_process_on_port(port: int):
"""
Kill processes that are listening on the given port.
Uses multiple methods to ensure thorough cleanup.
"""
current_pid = os.getpid()
parent_pid = os.getppid()
# Method 1: Use lsof to find processes
try:
output = subprocess.check_output(f"lsof -i:{port} -t", shell=True).decode().strip()
for pid in output.splitlines():
pid = int(pid)
if pid in (current_pid, parent_pid):
print(f"Skip killing current process (pid={pid}) on port {port}")
continue
try:
# First try SIGTERM for graceful shutdown
os.kill(pid, signal.SIGTERM)
time.sleep(1)
# Then SIGKILL if still running
os.kill(pid, signal.SIGKILL)
print(f"Killed process on port {port}, pid={pid}")
except ProcessLookupError:
pass # Process already terminated
except subprocess.CalledProcessError:
pass
# Method 2: Use netstat and fuser as backup
try:
# Find processes using netstat and awk
cmd = f"netstat -tulpn 2>/dev/null | grep :{port} | awk '{{print $7}}' | cut -d'/' -f1"
output = subprocess.check_output(cmd, shell=True).decode().strip()
for pid in output.splitlines():
if pid and pid.isdigit():
pid = int(pid)
if pid in (current_pid, parent_pid):
continue
try:
os.kill(pid, signal.SIGKILL)
print(f"Killed process (netstat) on port {port}, pid={pid}")
except ProcessLookupError:
pass
except (subprocess.CalledProcessError, FileNotFoundError):
pass
# Method 3: Use fuser if available
try:
subprocess.run(f"fuser -k {port}/tcp", shell=True, timeout=5)
except (subprocess.TimeoutExpired, subprocess.CalledProcessError, FileNotFoundError):
pass
def clean_ports(ports=None):
"""
Kill all processes occupying the ports
"""
if ports is None:
ports = PORTS_TO_CLEAN
print(f"Cleaning ports: {ports}")
for port in ports:
kill_process_on_port(port)
# Double check and retry if ports are still in use
time.sleep(2)
for port in ports:
if is_port_open("127.0.0.1", port, timeout=0.1):
print(f"Port {port} still in use, retrying cleanup...")
kill_process_on_port(port)
time.sleep(1)
def clean(ports=None):
"""
Clean up resources used during testing.
"""
clean_ports(ports)
# Clean CUDA devices before and after tests.
# NOTE: It is dangerous to use this flag on development machines, as it may kill other processes
clean_cuda = int(os.getenv("CLEAN_CUDA", "0")) == 1
if clean_cuda:
_clean_cuda_process()
INPUT_BATCH = """
{"custom_id": "req-00001", "method": "POST", "url": "/v1/chat/completions", "body": {"messages": [{"role": "user", "content": "Can you write a short poem? (id=1)"}], "temperature": 0.7, "max_tokens": 200}}
{"custom_id": "req-00002", "method": "POST", "url": "/v1/chat/completions", "body": {"messages": [{"role": "user", "content": "What can you do? (id=2)"}], "temperature": 0.7, "max_tokens": 200}}
@@ -1249,28 +1373,63 @@ class TestBatchProgressTrackerExtended(unittest.TestCase):
mock_logger.info.assert_called_with(f"Progress: {i}/100 requests completed")
FD_ENGINE_QUEUE_PORT = int(os.getenv("FD_ENGINE_QUEUE_PORT", 8133))
FD_CACHE_QUEUE_PORT = int(os.getenv("FD_CACHE_QUEUE_PORT", 8333))
class TestFastDeployBatch(unittest.TestCase):
"""测试 FastDeploy 批处理功能的 unittest 测试类"""
def setUp(self):
"""每个测试方法执行前的准备工作"""
print("\n[SetUp] Pre-test cleanup...")
# 1. 清理日志目录
if os.path.exists("log") and os.path.isdir("log"):
shutil.rmtree("log")
# 2. 清理端口
clean_ports()
# 3. 确定模型路径
self.model_path = "baidu/ERNIE-4.5-0.3B-PT"
self.base_command = ["fastdeploy", "run-batch"]
self.run_batch_command = ["python", "fastdeploy/entrypoints/openai/run_batch.py"]
self.run_batch_command = [sys.executable, "fastdeploy/entrypoints/openai/run_batch.py"]
# 用于追踪所有启动的子进程,以便在 tearDown 中清理
self.subprocesses = []
def tearDown(self):
"""每个测试方法执行后的清理工作"""
print("\n[TearDown] executing cleanup...")
for proc in self.subprocesses:
try:
# 检查进程是否还在运行
if proc.poll() is None:
print(f"Terminating process group (PGID: {proc.pid})...")
# 使用 os.killpg 杀掉整个进程组,确保子进程也被清理
os.killpg(proc.pid, signal.SIGTERM)
# 等待进程退出
start_wait = time.time()
while proc.poll() is None:
if time.time() - start_wait > 5:
print(f"Process group (PGID: {proc.pid}) timed out, forcing SIGKILL...")
os.killpg(proc.pid, signal.SIGKILL)
break
time.sleep(0.1)
proc.wait()
except Exception as e:
print(f"Error cleaning up process (PID: {proc.pid}): {e}")
# 再次确保端口释放
pass
def run_fastdeploy_command(self, input_content, port=None):
"""运行 FastDeploy 命令的辅助方法"""
if port is None:
port = str(FD_CACHE_QUEUE_PORT)
with tempfile.NamedTemporaryFile("w") as input_file, tempfile.NamedTemporaryFile("r") as output_file:
input_file.write(input_content)
input_file.flush()
with (
tempfile.NamedTemporaryFile("w", delete=False) as input_file,
tempfile.NamedTemporaryFile("r", delete=False) as output_file,
):
param = [
"-i",
@@ -1294,23 +1453,75 @@ class TestFastDeployBatch(unittest.TestCase):
"--engine-worker-queue-port",
str(FD_ENGINE_QUEUE_PORT),
]
input_path = input_file.name
output_path = output_file.name
# command = self.base_command + param
run_batch_command = self.run_batch_command + param
try:
input_file.write(input_content)
input_file.flush()
proc = subprocess.Popen(run_batch_command)
proc.communicate()
return_code = proc.wait()
param = [
"-i",
input_path,
"-o",
output_path,
"--model",
self.model_path,
"--cache-queue-port",
port,
"--tensor-parallel-size",
"1",
"--quantization",
"wint4",
"--max-model-len",
"5120",
"--max-num-seqs",
"64",
"--load-choices",
"default_v1",
"--engine-worker-queue-port",
str(FD_ENGINE_QUEUE_PORT),
]
# 读取输出文件内容
output_file.seek(0)
contents = output_file.read()
run_batch_command = self.run_batch_command + param
return return_code, contents, proc
print(f"Executing command: {' '.join(run_batch_command)}")
proc = subprocess.Popen(
run_batch_command,
# stdout=logfile,
# stderr=subprocess.STDOUT,
start_new_session=True,
)
self.subprocesses.append(proc)
try:
proc.wait(timeout=300) # 等待最多 5 分钟
except subprocess.TimeoutExpired:
print("[TIMEOUT] run_batch command timed out.")
os.killpg(proc.pid, signal.SIGKILL)
raise
return_code = proc.returncode
# 读取输出结果
output_file.seek(0)
contents = output_file.read()
return return_code, contents, proc
finally:
# 清理临时文件
if os.path.exists(input_path):
os.unlink(input_path)
if os.path.exists(output_path):
os.unlink(output_path)
def test_completions(self):
"""测试正常的批量chat请求"""
return_code, contents, proc = self.run_fastdeploy_command(INPUT_BATCH, port="2235")
print(f"进程输出: {return_code}")
self.assertEqual(return_code, 0, f"进程返回非零码: {return_code}, 进程信息: {proc}")