[CI] Optimize port cleanup logic (#4860)

This commit is contained in:
YuBaoku
2025-11-06 19:13:48 +08:00
committed by GitHub
parent 5aa73d32f4
commit a139f8f3cb
12 changed files with 352 additions and 246 deletions

View File

@@ -92,6 +92,9 @@ jobs:
echo "FD_ENGINE_QUEUE_PORT=${FD_ENGINE_QUEUE_PORT}"
echo "FD_METRICS_PORT=${FD_METRICS_PORT}"
echo "FD_CACHE_QUEUE_PORT=${FD_CACHE_QUEUE_PORT}"
echo "FD_ZMQ_RECV_REQUEST_SERVER_PORT=${FD_ZMQ_RECV_REQUEST_SERVER_PORT}"
echo "FD_ZMQ_SEND_RESPONSE_SERVER_PORT=${FD_ZMQ_SEND_RESPONSE_SERVER_PORT}"
echo "FD_ZMQ_CONTROL_CMD_SERVER_PORTS=${FD_ZMQ_CONTROL_CMD_SERVER_PORTS}"
echo "DEVICES=${DEVICES}"
echo "========================================================="
@@ -141,6 +144,9 @@ jobs:
-e "FD_METRICS_PORT=${FD_METRICS_PORT}" \
-e "FD_CACHE_QUEUE_PORT=${FD_CACHE_QUEUE_PORT}" \
-e "FLASK_PORT=${FLASK_PORT}" \
-e "FD_ZMQ_RECV_REQUEST_SERVER_PORT=${FD_ZMQ_RECV_REQUEST_SERVER_PORT}" \
-e "FD_ZMQ_SEND_RESPONSE_SERVER_PORT=${FD_ZMQ_SEND_RESPONSE_SERVER_PORT}" \
-e "FD_ZMQ_CONTROL_CMD_SERVER_PORTS=${FD_ZMQ_CONTROL_CMD_SERVER_PORTS}" \
-e "fd_wheel_url=${fd_wheel_url}" \
--gpus "\"device=${DEVICES}\"" ${docker_image} /bin/bash -c '
git config --global --add safe.directory /workspace/FastDeploy

View File

@@ -103,6 +103,8 @@ jobs:
FD_ENGINE_QUEUE_PORT=$((42058 + DEVICE_PORT * 100))
FD_METRICS_PORT=$((42078 + DEVICE_PORT * 100))
FD_CACHE_QUEUE_PORT=$((42098 + DEVICE_PORT * 100))
FD_ROUTER_PORT=$((42048 + DEVICE_PORT * 100))
FD_CONNECTOR_PORT=$((42038 + DEVICE_PORT * 100))
echo "Test ENV Parameter:"
echo "========================================================="
echo "FLASK_PORT=${FLASK_PORT}"
@@ -110,6 +112,8 @@ jobs:
echo "FD_ENGINE_QUEUE_PORT=${FD_ENGINE_QUEUE_PORT}"
echo "FD_METRICS_PORT=${FD_METRICS_PORT}"
echo "FD_CACHE_QUEUE_PORT=${FD_CACHE_QUEUE_PORT}"
echo "FD_ROUTER_PORT=${FD_ROUTER_PORT}"
echo "FD_CONNECTOR_PORT=${FD_CONNECTOR_PORT}"
echo "DEVICES=${DEVICES}"
echo "========================================================="
@@ -159,6 +163,8 @@ jobs:
-e "FD_METRICS_PORT=${FD_METRICS_PORT}" \
-e "FLASK_PORT=${FLASK_PORT}" \
-e "FD_CACHE_QUEUE_PORT=${FD_CACHE_QUEUE_PORT}" \
-e "FD_ROUTER_PORT=${FD_ROUTER_PORT}" \
-e "FD_CONNECTOR_PORT=${FD_CONNECTOR_PORT}" \
-e TZ="Asia/Shanghai" \
-e "fd_wheel_url=${fd_wheel_url}" \
-e "BASE_REF=${BASE_REF}" \

View File

@@ -17,7 +17,6 @@ import os
import re
import shutil
import signal
import socket
import subprocess
import sys
import time
@@ -26,49 +25,17 @@ import openai
import pytest
import requests
# Read ports from environment variables; use default values if not set
FD_API_PORT = int(os.getenv("FD_API_PORT", 8188))
FD_ENGINE_QUEUE_PORT = int(os.getenv("FD_ENGINE_QUEUE_PORT", 8133))
FD_METRICS_PORT = int(os.getenv("FD_METRICS_PORT", 8233))
FD_CACHE_QUEUE_PORT = int(os.getenv("FD_CACHE_QUEUE_PORT", 8234))
tests_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
sys.path.insert(0, tests_dir)
# List of ports to clean before and after tests
PORTS_TO_CLEAN = [FD_API_PORT, FD_ENGINE_QUEUE_PORT, FD_METRICS_PORT, FD_CACHE_QUEUE_PORT]
def is_port_open(host: str, port: int, timeout=1.0):
"""
Check if a TCP port is open on the given host.
Returns True if connection succeeds, False otherwise.
"""
try:
with socket.create_connection((host, port), timeout):
return True
except Exception:
return False
def kill_process_on_port(port: int):
"""
Kill processes that are listening on the given port.
Uses `lsof` to find process ids and sends SIGKILL.
"""
try:
output = subprocess.check_output(f"lsof -i:{port} -t", shell=True).decode().strip()
for pid in output.splitlines():
os.kill(int(pid), signal.SIGKILL)
print(f"Killed process on port {port}, pid={pid}")
except subprocess.CalledProcessError:
pass
def clean_ports():
"""
Kill all processes occupying the ports listed in PORTS_TO_CLEAN.
"""
for port in PORTS_TO_CLEAN:
kill_process_on_port(port)
time.sleep(2)
from e2e.utils.serving_utils import (
FD_API_PORT,
FD_CACHE_QUEUE_PORT,
FD_ENGINE_QUEUE_PORT,
FD_METRICS_PORT,
clean_ports,
is_port_open,
)
@pytest.fixture(scope="session", autouse=True)

View File

@@ -90,38 +90,71 @@ def is_port_open(host: str, port: int, timeout=1.0):
def kill_process_on_port(port: int):
"""
Kill processes that are listening on the given port.
Uses `lsof` to find process ids and sends SIGKILL.
Uses multiple methods to ensure thorough cleanup.
"""
current_pid = os.getpid()
parent_pid = os.getppid()
# Method 1: Use lsof to find processes
try:
output = subprocess.check_output(f"lsof -i:{port} -t", shell=True).decode().strip()
for pid in output.splitlines():
os.kill(int(pid), signal.SIGKILL)
print(f"Killed process on port {port}, pid={pid}")
pid = int(pid)
if pid in (current_pid, parent_pid):
print(f"Skip killing current process (pid={pid}) on port {port}")
continue
try:
# First try SIGTERM for graceful shutdown
os.kill(pid, signal.SIGTERM)
time.sleep(1)
# Then SIGKILL if still running
os.kill(pid, signal.SIGKILL)
print(f"Killed process on port {port}, pid={pid}")
except ProcessLookupError:
pass # Process already terminated
except subprocess.CalledProcessError:
pass
# Method 2: Use netstat and fuser as backup
try:
result = subprocess.run(
f"ps -ef -ww| grep {FD_CACHE_QUEUE_PORT} | grep -v grep", shell=True, capture_output=True, text=True
)
for line in result.stdout.strip().split("\n"):
if not line:
continue
parts = line.split()
pid = int(parts[1]) # ps -ef 的第二列是 PID
print(f"Killing PID: {pid}")
os.kill(pid, signal.SIGKILL)
except Exception as e:
print(f"Failed to kill cache manager process: {e}")
# Find processes using netstat and awk
cmd = f"netstat -tulpn 2>/dev/null | grep :{port} | awk '{{print $7}}' | cut -d'/' -f1"
output = subprocess.check_output(cmd, shell=True).decode().strip()
for pid in output.splitlines():
if pid and pid.isdigit():
pid = int(pid)
if pid in (current_pid, parent_pid):
continue
try:
os.kill(pid, signal.SIGKILL)
print(f"Killed process (netstat) on port {port}, pid={pid}")
except ProcessLookupError:
pass
except (subprocess.CalledProcessError, FileNotFoundError):
pass
# Method 3: Use fuser if available
try:
subprocess.run(f"fuser -k {port}/tcp", shell=True, timeout=5)
except (subprocess.TimeoutExpired, subprocess.CalledProcessError, FileNotFoundError):
pass
def clean_ports():
"""
Kill all processes occupying the ports listed in PORTS_TO_CLEAN.
"""
print(f"Cleaning ports: {PORTS_TO_CLEAN}")
for port in PORTS_TO_CLEAN:
kill_process_on_port(port)
# Double check and retry if ports are still in use
time.sleep(2)
for port in PORTS_TO_CLEAN:
if is_port_open("127.0.0.1", port, timeout=0.1):
print(f"Port {port} still in use, retrying cleanup...")
kill_process_on_port(port)
time.sleep(1)
@pytest.fixture(scope="session", autouse=True)

View File

@@ -16,7 +16,6 @@ import json
import os
import re
import signal
import socket
import subprocess
import sys
import time
@@ -25,53 +24,21 @@ import openai
import pytest
import requests
# Read ports from environment variables; use default values if not set
FD_API_PORT = int(os.getenv("FD_API_PORT", 8188))
FD_ENGINE_QUEUE_PORT = int(os.getenv("FD_ENGINE_QUEUE_PORT", 8133))
FD_METRICS_PORT = int(os.getenv("FD_METRICS_PORT", 8233))
FD_CACHE_QUEUE_PORT = int(os.getenv("FD_CACHE_QUEUE_PORT", 8234))
tests_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
sys.path.insert(0, tests_dir)
# List of ports to clean before and after tests
PORTS_TO_CLEAN = [FD_API_PORT, FD_ENGINE_QUEUE_PORT, FD_METRICS_PORT, FD_CACHE_QUEUE_PORT]
from e2e.utils.serving_utils import (
FD_API_PORT,
FD_CACHE_QUEUE_PORT,
FD_ENGINE_QUEUE_PORT,
FD_METRICS_PORT,
clean_ports,
is_port_open,
)
os.environ["FD_USE_MACHETE"] = "0"
def is_port_open(host: str, port: int, timeout=1.0):
"""
Check if a TCP port is open on the given host.
Returns True if connection succeeds, False otherwise.
"""
try:
with socket.create_connection((host, port), timeout):
return True
except Exception:
return False
def kill_process_on_port(port: int):
"""
Kill processes that are listening on the given port.
Uses `lsof` to find process ids and sends SIGKILL.
"""
try:
output = subprocess.check_output(f"lsof -i:{port} -t", shell=True).decode().strip()
for pid in output.splitlines():
os.kill(int(pid), signal.SIGKILL)
print(f"Killed process on port {port}, pid={pid}")
except subprocess.CalledProcessError:
pass
def clean_ports():
"""
Kill all processes occupying the ports listed in PORTS_TO_CLEAN.
"""
for port in PORTS_TO_CLEAN:
kill_process_on_port(port)
time.sleep(2)
@pytest.fixture(scope="session", autouse=True)
def setup_and_run_server():
"""

View File

@@ -17,7 +17,6 @@ import json
import os
import re
import signal
import socket
import subprocess
import sys
import time
@@ -27,49 +26,17 @@ import pytest
import requests
from jsonschema import validate
# Read ports from environment variables; use default values if not set
FD_API_PORT = int(os.getenv("FD_API_PORT", 8188))
FD_ENGINE_QUEUE_PORT = int(os.getenv("FD_ENGINE_QUEUE_PORT", 8133))
FD_METRICS_PORT = int(os.getenv("FD_METRICS_PORT", 8233))
FD_CACHE_QUEUE_PORT = int(os.getenv("FD_CACHE_QUEUE_PORT", 8333))
tests_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
sys.path.insert(0, tests_dir)
# List of ports to clean before and after tests
PORTS_TO_CLEAN = [FD_API_PORT, FD_ENGINE_QUEUE_PORT, FD_METRICS_PORT, FD_CACHE_QUEUE_PORT]
def is_port_open(host: str, port: int, timeout=1.0):
"""
Check if a TCP port is open on the given host.
Returns True if connection succeeds, False otherwise.
"""
try:
with socket.create_connection((host, port), timeout):
return True
except Exception:
return False
def kill_process_on_port(port: int):
"""
Kill processes that are listening on the given port.
Uses `lsof` to find process ids and sends SIGKILL.
"""
try:
output = subprocess.check_output(f"lsof -i:{port} -t", shell=True).decode().strip()
for pid in output.splitlines():
os.kill(int(pid), signal.SIGKILL)
print(f"Killed process on port {port}, pid={pid}")
except subprocess.CalledProcessError:
pass
def clean_ports():
"""
Kill all processes occupying the ports listed in PORTS_TO_CLEAN.
"""
for port in PORTS_TO_CLEAN:
kill_process_on_port(port)
time.sleep(2)
from e2e.utils.serving_utils import (
FD_API_PORT,
FD_CACHE_QUEUE_PORT,
FD_ENGINE_QUEUE_PORT,
FD_METRICS_PORT,
clean_ports,
is_port_open,
)
@pytest.fixture(scope="session", autouse=True)

View File

@@ -16,7 +16,6 @@ import json
import os
import re
import signal
import socket
import subprocess
import sys
import time
@@ -25,47 +24,17 @@ import openai
import pytest
import requests
# Read ports from environment variables; use default values if not set
FD_API_PORT = int(os.getenv("FD_API_PORT", 8188))
FD_ENGINE_QUEUE_PORT = int(os.getenv("FD_ENGINE_QUEUE_PORT", 8133))
FD_METRICS_PORT = int(os.getenv("FD_METRICS_PORT", 8233))
tests_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
sys.path.insert(0, tests_dir)
# List of ports to clean before and after tests
PORTS_TO_CLEAN = [FD_API_PORT, FD_ENGINE_QUEUE_PORT, FD_METRICS_PORT]
def is_port_open(host: str, port: int, timeout=1.0):
"""
Check if a TCP port is open on the given host.
Returns True if connection succeeds, False otherwise.
"""
try:
with socket.create_connection((host, port), timeout):
return True
except Exception:
return False
def kill_process_on_port(port: int):
"""
Kill processes that are listening on the given port.
Uses `lsof` to find process ids and sends SIGKILL.
"""
try:
output = subprocess.check_output(f"lsof -i:{port} -t", shell=True).decode().strip()
for pid in output.splitlines():
os.kill(int(pid), signal.SIGKILL)
print(f"Killed process on port {port}, pid={pid}")
except subprocess.CalledProcessError:
pass
def clean_ports():
"""
Kill all processes occupying the ports listed in PORTS_TO_CLEAN.
"""
for port in PORTS_TO_CLEAN:
kill_process_on_port(port)
from e2e.utils.serving_utils import (
FD_API_PORT,
FD_CACHE_QUEUE_PORT,
FD_ENGINE_QUEUE_PORT,
FD_METRICS_PORT,
clean_ports,
is_port_open,
)
@pytest.fixture(scope="session", autouse=True)
@@ -99,6 +68,8 @@ def setup_and_run_server():
str(FD_ENGINE_QUEUE_PORT),
"--metrics-port",
str(FD_METRICS_PORT),
"--cache-queue-port",
str(FD_CACHE_QUEUE_PORT),
"--enable-mm",
"--max-model-len",
"32768",

View File

@@ -15,7 +15,6 @@
import os
import re
import signal
import socket
import subprocess
import sys
import time
@@ -23,49 +22,17 @@ import time
import pytest
import requests
# Read ports from environment variables; use default values if not set
FD_API_PORT = int(os.getenv("FD_API_PORT", 8188))
FD_ENGINE_QUEUE_PORT = int(os.getenv("FD_ENGINE_QUEUE_PORT", 8133))
FD_METRICS_PORT = int(os.getenv("FD_METRICS_PORT", 8233))
FD_CACHE_QUEUE_PORT = int(os.getenv("FD_CACHE_QUEUE_PORT", 8333))
tests_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
sys.path.insert(0, tests_dir)
# List of ports to clean before and after tests
PORTS_TO_CLEAN = [FD_API_PORT, FD_ENGINE_QUEUE_PORT, FD_METRICS_PORT, FD_CACHE_QUEUE_PORT]
def is_port_open(host: str, port: int, timeout=1.0):
"""
Check if a TCP port is open on the given host.
Returns True if connection succeeds, False otherwise.
"""
try:
with socket.create_connection((host, port), timeout):
return True
except Exception:
return False
def kill_process_on_port(port: int):
"""
Kill processes that are listening on the given port.
Uses `lsof` to find process ids and sends SIGKILL.
"""
try:
output = subprocess.check_output(f"lsof -i:{port} -t", shell=True).decode().strip()
for pid in output.splitlines():
os.kill(int(pid), signal.SIGKILL)
print(f"Killed process on port {port}, pid={pid}")
except subprocess.CalledProcessError:
pass
def clean_ports():
"""
Kill all processes occupying the ports listed in PORTS_TO_CLEAN.
"""
for port in PORTS_TO_CLEAN:
kill_process_on_port(port)
time.sleep(2)
from e2e.utils.serving_utils import (
FD_API_PORT,
FD_CACHE_QUEUE_PORT,
FD_ENGINE_QUEUE_PORT,
FD_METRICS_PORT,
clean_ports,
is_port_open,
)
@pytest.fixture(scope="session", autouse=True)

View File

@@ -16,20 +16,113 @@ import json
import os
import shutil
import signal
import socket
import subprocess
import sys
import time
import pytest
import requests
from utils.serving_utils import (
# Read ports from environment variables; use default values if not set
FD_API_PORT = int(os.getenv("FD_API_PORT", 8188))
FD_ENGINE_QUEUE_PORT = int(os.getenv("FD_ENGINE_QUEUE_PORT", 8133))
FD_METRICS_PORT = int(os.getenv("FD_METRICS_PORT", 8233))
FD_CACHE_QUEUE_PORT = int(os.getenv("FD_CACHE_QUEUE_PORT", 8333))
# List of ports to clean before and after tests
PORTS_TO_CLEAN = [
FD_API_PORT,
FD_CACHE_QUEUE_PORT,
FD_ENGINE_QUEUE_PORT,
FD_METRICS_PORT,
clean_ports,
is_port_open,
)
FD_CACHE_QUEUE_PORT,
FD_API_PORT + 1,
FD_ENGINE_QUEUE_PORT + 1,
FD_METRICS_PORT + 1,
FD_CACHE_QUEUE_PORT + 1,
]
def is_port_open(host: str, port: int, timeout=1.0):
"""
Check if a TCP port is open on the given host.
Returns True if connection succeeds, False otherwise.
"""
try:
with socket.create_connection((host, port), timeout):
return True
except Exception:
return False
def kill_process_on_port(port: int):
"""
Kill processes that are listening on the given port.
Uses multiple methods to ensure thorough cleanup.
"""
current_pid = os.getpid()
parent_pid = os.getppid()
# Method 1: Use lsof to find processes
try:
output = subprocess.check_output(f"lsof -i:{port} -t", shell=True).decode().strip()
for pid in output.splitlines():
pid = int(pid)
if pid in (current_pid, parent_pid):
print(f"Skip killing current process (pid={pid}) on port {port}")
continue
try:
# First try SIGTERM for graceful shutdown
os.kill(pid, signal.SIGTERM)
time.sleep(1)
# Then SIGKILL if still running
os.kill(pid, signal.SIGKILL)
print(f"Killed process on port {port}, pid={pid}")
except ProcessLookupError:
pass # Process already terminated
except subprocess.CalledProcessError:
pass
# Method 2: Use netstat and fuser as backup
try:
# Find processes using netstat and awk
cmd = f"netstat -tulpn 2>/dev/null | grep :{port} | awk '{{print $7}}' | cut -d'/' -f1"
output = subprocess.check_output(cmd, shell=True).decode().strip()
for pid in output.splitlines():
if pid and pid.isdigit():
pid = int(pid)
if pid in (current_pid, parent_pid):
continue
try:
os.kill(pid, signal.SIGKILL)
print(f"Killed process (netstat) on port {port}, pid={pid}")
except ProcessLookupError:
pass
except (subprocess.CalledProcessError, FileNotFoundError):
pass
# Method 3: Use fuser if available
try:
subprocess.run(f"fuser -k {port}/tcp", shell=True, timeout=5)
except (subprocess.TimeoutExpired, subprocess.CalledProcessError, FileNotFoundError):
pass
def clean_ports():
"""
Kill all processes occupying the ports listed in PORTS_TO_CLEAN.
"""
print(f"Cleaning ports: {PORTS_TO_CLEAN}")
for port in PORTS_TO_CLEAN:
kill_process_on_port(port)
# Double check and retry if ports are still in use
time.sleep(2)
for port in PORTS_TO_CLEAN:
if is_port_open("127.0.0.1", port, timeout=0.1):
print(f"Port {port} still in use, retrying cleanup...")
kill_process_on_port(port)
time.sleep(1)
@pytest.fixture(scope="session", autouse=True)

View File

@@ -108,30 +108,71 @@ def get_registered_number(router_url) -> list:
def kill_process_on_port(port: int):
"""
Kill processes that are listening on the given port.
Uses `lsof` to find process ids and sends SIGKILL.
Uses multiple methods to ensure thorough cleanup.
"""
current_pid = os.getpid()
parent_pid = os.getppid()
# Method 1: Use lsof to find processes
try:
output = subprocess.check_output(f"lsof -i:{port} -t", shell=True).decode().strip()
current_pid = os.getpid()
parent_pid = os.getppid()
for pid in output.splitlines():
pid = int(pid)
if pid in (current_pid, parent_pid):
print(f"Skip killing current process (pid={pid}) on port {port}")
continue
os.kill(pid, signal.SIGKILL)
print(f"Killed process on port {port}, pid={pid}")
try:
# First try SIGTERM for graceful shutdown
os.kill(pid, signal.SIGTERM)
time.sleep(1)
# Then SIGKILL if still running
os.kill(pid, signal.SIGKILL)
print(f"Killed process on port {port}, pid={pid}")
except ProcessLookupError:
pass # Process already terminated
except subprocess.CalledProcessError:
pass
# Method 2: Use netstat and fuser as backup
try:
# Find processes using netstat and awk
cmd = f"netstat -tulpn 2>/dev/null | grep :{port} | awk '{{print $7}}' | cut -d'/' -f1"
output = subprocess.check_output(cmd, shell=True).decode().strip()
for pid in output.splitlines():
if pid and pid.isdigit():
pid = int(pid)
if pid in (current_pid, parent_pid):
continue
try:
os.kill(pid, signal.SIGKILL)
print(f"Killed process (netstat) on port {port}, pid={pid}")
except ProcessLookupError:
pass
except (subprocess.CalledProcessError, FileNotFoundError):
pass
# Method 3: Use fuser if available
try:
subprocess.run(f"fuser -k {port}/tcp", shell=True, timeout=5)
except (subprocess.TimeoutExpired, subprocess.CalledProcessError, FileNotFoundError):
pass
def clean_ports():
"""
Kill all processes occupying the ports listed in PORTS_TO_CLEAN.
"""
print(f"Cleaning ports: {PORTS_TO_CLEAN}")
for port in PORTS_TO_CLEAN:
kill_process_on_port(port)
# Double check and retry if ports are still in use
time.sleep(2)
for port in PORTS_TO_CLEAN:
if is_port_open("127.0.0.1", port, timeout=0.1):
print(f"Port {port} still in use, retrying cleanup...")
kill_process_on_port(port)
time.sleep(1)
@pytest.fixture(scope="session", autouse=True)

View File

@@ -107,30 +107,71 @@ def get_registered_number(router_url) -> list:
def kill_process_on_port(port: int):
"""
Kill processes that are listening on the given port.
Uses `lsof` to find process ids and sends SIGKILL.
Uses multiple methods to ensure thorough cleanup.
"""
current_pid = os.getpid()
parent_pid = os.getppid()
# Method 1: Use lsof to find processes
try:
output = subprocess.check_output(f"lsof -i:{port} -t", shell=True).decode().strip()
current_pid = os.getpid()
parent_pid = os.getppid()
for pid in output.splitlines():
pid = int(pid)
if pid in (current_pid, parent_pid):
print(f"Skip killing current process (pid={pid}) on port {port}")
continue
os.kill(pid, signal.SIGKILL)
print(f"Killed process on port {port}, pid={pid}")
try:
# First try SIGTERM for graceful shutdown
os.kill(pid, signal.SIGTERM)
time.sleep(1)
# Then SIGKILL if still running
os.kill(pid, signal.SIGKILL)
print(f"Killed process on port {port}, pid={pid}")
except ProcessLookupError:
pass # Process already terminated
except subprocess.CalledProcessError:
pass
# Method 2: Use netstat and fuser as backup
try:
# Find processes using netstat and awk
cmd = f"netstat -tulpn 2>/dev/null | grep :{port} | awk '{{print $7}}' | cut -d'/' -f1"
output = subprocess.check_output(cmd, shell=True).decode().strip()
for pid in output.splitlines():
if pid and pid.isdigit():
pid = int(pid)
if pid in (current_pid, parent_pid):
continue
try:
os.kill(pid, signal.SIGKILL)
print(f"Killed process (netstat) on port {port}, pid={pid}")
except ProcessLookupError:
pass
except (subprocess.CalledProcessError, FileNotFoundError):
pass
# Method 3: Use fuser if available
try:
subprocess.run(f"fuser -k {port}/tcp", shell=True, timeout=5)
except (subprocess.TimeoutExpired, subprocess.CalledProcessError, FileNotFoundError):
pass
def clean_ports():
"""
Kill all processes occupying the ports listed in PORTS_TO_CLEAN.
"""
print(f"Cleaning ports: {PORTS_TO_CLEAN}")
for port in PORTS_TO_CLEAN:
kill_process_on_port(port)
# Double check and retry if ports are still in use
time.sleep(2)
for port in PORTS_TO_CLEAN:
if is_port_open("127.0.0.1", port, timeout=0.1):
print(f"Port {port} still in use, retrying cleanup...")
kill_process_on_port(port)
time.sleep(1)
@pytest.fixture(scope="session", autouse=True)

View File

@@ -53,24 +53,71 @@ def is_port_open(host: str, port: int, timeout=1.0):
def kill_process_on_port(port: int):
"""
Kill processes that are listening on the given port.
Uses `lsof` to find process ids and sends SIGKILL.
Uses multiple methods to ensure thorough cleanup.
"""
current_pid = os.getpid()
parent_pid = os.getppid()
# Method 1: Use lsof to find processes
try:
output = subprocess.check_output(f"lsof -i:{port} -t", shell=True).decode().strip()
for pid in output.splitlines():
os.kill(int(pid), signal.SIGKILL)
print(f"Killed process on port {port}, pid={pid}")
pid = int(pid)
if pid in (current_pid, parent_pid):
print(f"Skip killing current process (pid={pid}) on port {port}")
continue
try:
# First try SIGTERM for graceful shutdown
os.kill(pid, signal.SIGTERM)
time.sleep(1)
# Then SIGKILL if still running
os.kill(pid, signal.SIGKILL)
print(f"Killed process on port {port}, pid={pid}")
except ProcessLookupError:
pass # Process already terminated
except subprocess.CalledProcessError:
pass
# Method 2: Use netstat and fuser as backup
try:
# Find processes using netstat and awk
cmd = f"netstat -tulpn 2>/dev/null | grep :{port} | awk '{{print $7}}' | cut -d'/' -f1"
output = subprocess.check_output(cmd, shell=True).decode().strip()
for pid in output.splitlines():
if pid and pid.isdigit():
pid = int(pid)
if pid in (current_pid, parent_pid):
continue
try:
os.kill(pid, signal.SIGKILL)
print(f"Killed process (netstat) on port {port}, pid={pid}")
except ProcessLookupError:
pass
except (subprocess.CalledProcessError, FileNotFoundError):
pass
# Method 3: Use fuser if available
try:
subprocess.run(f"fuser -k {port}/tcp", shell=True, timeout=5)
except (subprocess.TimeoutExpired, subprocess.CalledProcessError, FileNotFoundError):
pass
def clean_ports():
"""
Kill all processes occupying the ports listed in PORTS_TO_CLEAN.
"""
print(f"Cleaning ports: {PORTS_TO_CLEAN}")
for port in PORTS_TO_CLEAN:
kill_process_on_port(port)
# Double check and retry if ports are still in use
time.sleep(2)
for port in PORTS_TO_CLEAN:
if is_port_open("127.0.0.1", port, timeout=0.1):
print(f"Port {port} still in use, retrying cleanup...")
kill_process_on_port(port)
time.sleep(1)
@pytest.fixture(scope="session", autouse=True)