[CI] Update PORT range to avoid conflict with system ports (#4953)

This commit is contained in:
YuBaoku
2025-11-12 11:17:49 +08:00
committed by GitHub
parent 09cd6c5d3e
commit 8a96944a0a
8 changed files with 91 additions and 55 deletions

View File

@@ -76,11 +76,11 @@ jobs:
DEVICES=$(echo "$CARD_ID" | fold -w1 | paste -sd,)
DEVICE_PORT=$(echo "$DEVICES" | cut -d',' -f1)
FLASK_PORT=$((42068 + DEVICE_PORT * 100))
FD_API_PORT=$((42088 + DEVICE_PORT * 100))
FD_ENGINE_QUEUE_PORT=$((42058 + DEVICE_PORT * 100))
FD_METRICS_PORT=$((42078 + DEVICE_PORT * 100))
FD_CACHE_QUEUE_PORT=$((42098 + DEVICE_PORT * 100))
FLASK_PORT=$((8068 + DEVICE_PORT * 100))
FD_API_PORT=$((8088 + DEVICE_PORT * 100))
FD_ENGINE_QUEUE_PORT=$((8058 + DEVICE_PORT * 100))
FD_METRICS_PORT=$((8078 + DEVICE_PORT * 100))
FD_CACHE_QUEUE_PORT=$((8098 + DEVICE_PORT * 100))
echo "Test ENV Parameter:"
echo "========================================================="
echo "FLASK_PORT=${FLASK_PORT}"

View File

@@ -76,11 +76,11 @@ jobs:
DEVICES=$(echo "$CARD_ID" | fold -w1 | paste -sd,)
DEVICE_PORT=$(echo "$DEVICES" | cut -d',' -f1)
FLASK_PORT=$((42068 + DEVICE_PORT * 100))
FD_API_PORT=$((42088 + DEVICE_PORT * 100))
FD_ENGINE_QUEUE_PORT=$((42058 + DEVICE_PORT * 100))
FD_METRICS_PORT=$((42078 + DEVICE_PORT * 100))
FD_CACHE_QUEUE_PORT=$((42098 + DEVICE_PORT * 100))
FLASK_PORT=$((8068 + DEVICE_PORT * 100))
FD_API_PORT=$((8088 + DEVICE_PORT * 100))
FD_ENGINE_QUEUE_PORT=$((8058 + DEVICE_PORT * 100))
FD_METRICS_PORT=$((8078 + DEVICE_PORT * 100))
FD_CACHE_QUEUE_PORT=$((8098 + DEVICE_PORT * 100))
echo "Test ENV Parameter:"
echo "========================================================="
echo "FLASK_PORT=${FLASK_PORT}"

View File

@@ -68,11 +68,11 @@ jobs:
DEVICES=$(echo "$CARD_ID" | fold -w1 | paste -sd,)
DEVICE_PORT=$(echo "$DEVICES" | cut -d',' -f1)
FLASK_PORT=$((42068 + DEVICE_PORT * 100))
FD_API_PORT=$((42088 + DEVICE_PORT * 100))
FD_ENGINE_QUEUE_PORT=$((42058 + DEVICE_PORT * 100))
FD_METRICS_PORT=$((42078 + DEVICE_PORT * 100))
FD_CACHE_QUEUE_PORT=$((42098 + DEVICE_PORT * 100))
FLASK_PORT=$((8068 + DEVICE_PORT * 100))
FD_API_PORT=$((8088 + DEVICE_PORT * 100))
FD_ENGINE_QUEUE_PORT=$((8058 + DEVICE_PORT * 100))
FD_METRICS_PORT=$((8078 + DEVICE_PORT * 100))
FD_CACHE_QUEUE_PORT=$((8098 + DEVICE_PORT * 100))
echo "Test ENV Parameter:"
echo "========================================================="
echo "FLASK_PORT=${FLASK_PORT}"

View File

@@ -77,14 +77,14 @@ jobs:
DEVICES=$(echo "$CARD_ID" | fold -w1 | paste -sd,)
DEVICE_PORT=$(echo "$DEVICES" | cut -d',' -f1)
FLASK_PORT=$((42068 + DEVICE_PORT * 100))
FD_API_PORT=$((42088 + DEVICE_PORT * 100))
FD_ENGINE_QUEUE_PORT=$((42058 + DEVICE_PORT * 100))
FD_METRICS_PORT=$((42078 + DEVICE_PORT * 100))
FD_CACHE_QUEUE_PORT=$((42098 + DEVICE_PORT * 100))
FD_ZMQ_RECV_REQUEST_SERVER_PORT=$((42048 + DEVICE_PORT * 100))
FD_ZMQ_SEND_RESPONSE_SERVER_PORT=$((42038 + DEVICE_PORT * 100))
FD_ZMQ_CONTROL_CMD_SERVER_PORTS=$((42028 + DEVICE_PORT * 100))
FLASK_PORT=$((8068 + DEVICE_PORT * 100))
FD_API_PORT=$((8088 + DEVICE_PORT * 100))
FD_ENGINE_QUEUE_PORT=$((8058 + DEVICE_PORT * 100))
FD_METRICS_PORT=$((8078 + DEVICE_PORT * 100))
FD_CACHE_QUEUE_PORT=$((8098 + DEVICE_PORT * 100))
FD_ZMQ_RECV_REQUEST_SERVER_PORT=$((8048 + DEVICE_PORT * 100))
FD_ZMQ_SEND_RESPONSE_SERVER_PORT=$((8038 + DEVICE_PORT * 100))
FD_ZMQ_CONTROL_CMD_SERVER_PORTS=$((8028 + DEVICE_PORT * 100))
echo "Test ENV Parameter:"
echo "========================================================="
echo "FLASK_PORT=${FLASK_PORT}"

View File

@@ -76,12 +76,12 @@ jobs:
DEVICES=$(echo "$CARD_ID" | fold -w1 | paste -sd,)
DEVICE_PORT=$(echo "$DEVICES" | cut -d',' -f1)
FLASK_PORT=$((42068 + DEVICE_PORT * 100))
FD_API_PORT=$((42088 + DEVICE_PORT * 100))
FD_ENGINE_QUEUE_PORT=$((42058 + DEVICE_PORT * 100))
FD_METRICS_PORT=$((42078 + DEVICE_PORT * 100))
FD_CACHE_QUEUE_PORT=$((42038 + DEVICE_PORT * 100))
FD_INFERENCE_MSG_QUEUE_ID=$(( 42048 + DEVICE_PORT * 100))
FLASK_PORT=$((8068 + DEVICE_PORT * 100))
FD_API_PORT=$((8088 + DEVICE_PORT * 100))
FD_ENGINE_QUEUE_PORT=$((8058 + DEVICE_PORT * 100))
FD_METRICS_PORT=$((8078 + DEVICE_PORT * 100))
FD_CACHE_QUEUE_PORT=$((8038 + DEVICE_PORT * 100))
FD_INFERENCE_MSG_QUEUE_ID=$(( 8048 + DEVICE_PORT * 100))
echo "Test ENV Parameter:"
echo "========================================================="
echo "FLASK_PORT=${FLASK_PORT}"

View File

@@ -97,13 +97,13 @@ jobs:
DEVICES=$(echo "$CARD_ID" | fold -w1 | paste -sd,)
DEVICE_PORT=$(echo "$DEVICES" | cut -d',' -f1)
FLASK_PORT=$((42068 + DEVICE_PORT * 100))
FD_API_PORT=$((42088 + DEVICE_PORT * 100))
FD_ENGINE_QUEUE_PORT=$((42058 + DEVICE_PORT * 100))
FD_METRICS_PORT=$((42078 + DEVICE_PORT * 100))
FD_CACHE_QUEUE_PORT=$((42098 + DEVICE_PORT * 100))
FD_ROUTER_PORT=$((42048 + DEVICE_PORT * 100))
FD_CONNECTOR_PORT=$((42038 + DEVICE_PORT * 100))
FLASK_PORT=$((8068 + DEVICE_PORT * 100))
FD_API_PORT=$((8088 + DEVICE_PORT * 100))
FD_ENGINE_QUEUE_PORT=$((8058 + DEVICE_PORT * 100))
FD_METRICS_PORT=$((8078 + DEVICE_PORT * 100))
FD_CACHE_QUEUE_PORT=$((8098 + DEVICE_PORT * 100))
FD_ROUTER_PORT=$((8048 + DEVICE_PORT * 100))
FD_CONNECTOR_PORT=$((8038 + DEVICE_PORT * 100))
echo "Test ENV Parameter:"
echo "========================================================="
echo "FLASK_PORT=${FLASK_PORT}"

View File

@@ -32,10 +32,10 @@ class FDRunner:
from fastdeploy.entrypoints.llm import LLM
ports_to_clean = []
if "engine_worker_queue_port" in kwargs:
ports_to_clean.append(kwargs["engine_worker_queue_port"])
port_keys = ["engine_worker_queue_port", "cache_queue_port", "port", "metrics_port"]
ports_to_clean.extend(kwargs[k] for k in port_keys if k in kwargs)
clean_ports(ports_to_clean)
time.sleep(5)
time.sleep(10)
graph_optimization_config = {"use_cudagraph": False}
self.llm = LLM(
model=model_name_or_path,

View File

@@ -17,6 +17,7 @@ import shutil
import signal
import socket
import subprocess
import time
import traceback
from multiprocessing import Process, Queue
@@ -147,37 +148,72 @@ def form_model_get_output_topp1(
def kill_process_on_port(port: int):
"""
Kill processes that are listening on the given port.
Uses `lsof` to find process ids and sends SIGKILL.
Uses multiple methods to ensure thorough cleanup.
"""
current_pid = os.getpid()
parent_pid = os.getppid()
# Method 1: Use lsof to find processes
try:
output = subprocess.check_output(f"lsof -i:{port} -t", shell=True).decode().strip()
for pid in output.splitlines():
os.kill(int(pid), signal.SIGKILL)
print(f"Killed process on port {port}, pid={pid}")
pid = int(pid)
if pid in (current_pid, parent_pid):
print(f"Skip killing current process (pid={pid}) on port {port}")
continue
try:
# First try SIGTERM for graceful shutdown
os.kill(pid, signal.SIGTERM)
time.sleep(1)
# Then SIGKILL if still running
os.kill(pid, signal.SIGKILL)
print(f"Killed process on port {port}, pid={pid}")
except ProcessLookupError:
pass # Process already terminated
except subprocess.CalledProcessError:
pass
# Method 2: Use netstat and fuser as backup
try:
# Find processes using netstat and awk
cmd = f"netstat -tulpn 2>/dev/null | grep :{port} | awk '{{print $7}}' | cut -d'/' -f1"
output = subprocess.check_output(cmd, shell=True).decode().strip()
for pid in output.splitlines():
if pid and pid.isdigit():
pid = int(pid)
if pid in (current_pid, parent_pid):
continue
try:
os.kill(pid, signal.SIGKILL)
print(f"Killed process (netstat) on port {port}, pid={pid}")
except ProcessLookupError:
pass
except (subprocess.CalledProcessError, FileNotFoundError):
pass
# Method 3: Use fuser if available
try:
subprocess.run(f"fuser -k {port}/tcp", shell=True, timeout=5)
except (subprocess.TimeoutExpired, subprocess.CalledProcessError, FileNotFoundError):
pass
def clean_ports(ports_to_clean: list[int]):
"""
Kill all processes occupying the ports listed in PORTS_TO_CLEAN.
"""
try:
result = subprocess.run(
f"ps -efww | grep {FD_CACHE_QUEUE_PORT} | grep -v grep", shell=True, capture_output=True, text=True
)
for line in result.stdout.strip().split("\n"):
if not line:
continue
parts = line.split()
pid = int(parts[1])
print(f"Killing PID: {pid}")
os.kill(pid, signal.SIGKILL)
except Exception as e:
print(f"Failed to kill cache manager process: {e}, {str(traceback.format_exc())}")
print(f"Cleaning ports: {ports_to_clean}")
for port in ports_to_clean:
kill_process_on_port(port)
# Double check and retry if ports are still in use
time.sleep(2)
for port in ports_to_clean:
if is_port_open("127.0.0.1", port, timeout=0.1):
print(f"Port {port} still in use, retrying cleanup...")
kill_process_on_port(port)
time.sleep(1)
def is_port_open(host: str, port: int, timeout=1.0):
"""