From 9e4eb339b8def173078d75e82c3523d8c67759b2 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 23 Dec 2025 12:15:24 +0000 Subject: [PATCH] Address code review feedback - Translate Chinese comments to English for consistency - Add subprocess import to api_server.py for TimeoutExpired handling - Improve signal name detection in worker_process.py using signal.Signals - Add better docstring comments for signal handlers and cleanup functions Co-authored-by: Jiang-Jia-Jun <163579578+Jiang-Jia-Jun@users.noreply.github.com> --- fastdeploy/entrypoints/api_server.py | 20 +++++++++++++------- fastdeploy/entrypoints/openai/api_server.py | 20 ++++++++++---------- fastdeploy/worker/worker_process.py | 7 +++++-- 3 files changed, 28 insertions(+), 19 deletions(-) diff --git a/fastdeploy/entrypoints/api_server.py b/fastdeploy/entrypoints/api_server.py index 1ba5eb8fa..d848cb916 100644 --- a/fastdeploy/entrypoints/api_server.py +++ b/fastdeploy/entrypoints/api_server.py @@ -17,6 +17,7 @@ import json import os import signal +import subprocess import traceback import uvicorn @@ -37,13 +38,13 @@ llm_engine = None def cleanup_engine(): - """清理引擎资源""" + """Clean up engine resources""" global llm_engine if llm_engine is not None: try: if hasattr(llm_engine, "worker_proc") and llm_engine.worker_proc is not None: try: - # 检查进程是否已经结束 + # Check if process has already terminated if llm_engine.worker_proc.poll() is not None: api_server_logger.info("Worker process already terminated") return @@ -51,9 +52,14 @@ def cleanup_engine(): pgid = os.getpgid(llm_engine.worker_proc.pid) api_server_logger.info(f"Terminating worker process group {pgid}") os.killpg(pgid, signal.SIGTERM) - # 等待进程结束 - llm_engine.worker_proc.wait(timeout=5) - api_server_logger.info("Worker process terminated successfully") + # Wait for process to terminate + try: + llm_engine.worker_proc.wait(timeout=5) + api_server_logger.info("Worker process terminated successfully") + except subprocess.TimeoutExpired: + api_server_logger.warning("Worker process did not terminate in time, sending SIGKILL") + os.killpg(pgid, signal.SIGKILL) + llm_engine.worker_proc.wait(timeout=2) except ProcessLookupError: api_server_logger.info("Worker process already terminated") except Exception as e: @@ -63,11 +69,11 @@ def cleanup_engine(): def signal_handler(signum, frame): - """处理SIGINT和SIGTERM信号""" + """Handle SIGINT and SIGTERM signals""" sig_name = "SIGINT" if signum == signal.SIGINT else "SIGTERM" api_server_logger.info(f"Received {sig_name}, initiating graceful shutdown...") cleanup_engine() - # 让uvicorn处理实际的退出 + # Let uvicorn handle the actual exit def init_app(args): diff --git a/fastdeploy/entrypoints/openai/api_server.py b/fastdeploy/entrypoints/openai/api_server.py index ac395bc51..8be298ccb 100644 --- a/fastdeploy/entrypoints/openai/api_server.py +++ b/fastdeploy/entrypoints/openai/api_server.py @@ -91,13 +91,13 @@ connection_semaphore = StatefulSemaphore(MAX_CONCURRENT_CONNECTIONS) def cleanup_processes(): - """清理所有子进程""" + """Clean up all subprocesses""" global llm_engine if llm_engine is not None: try: if hasattr(llm_engine, "worker_proc") and llm_engine.worker_proc is not None: try: - # 检查进程是否已经结束 + # Check if process has already terminated if llm_engine.worker_proc.poll() is not None: api_server_logger.info("Worker process already terminated") return @@ -105,7 +105,7 @@ def cleanup_processes(): pgid = os.getpgid(llm_engine.worker_proc.pid) api_server_logger.info(f"Terminating worker process group {pgid}") os.killpg(pgid, signal.SIGTERM) - # 等待进程结束,如果超时则强制杀死 + # Wait for process to terminate, force kill if timeout try: llm_engine.worker_proc.wait(timeout=5) api_server_logger.info("Worker process terminated successfully") @@ -134,12 +134,12 @@ def cleanup_processes(): def signal_handler(signum, frame): - """处理SIGINT和SIGTERM信号""" + """Handle SIGINT and SIGTERM signals""" sig_name = "SIGINT" if signum == signal.SIGINT else "SIGTERM" api_server_logger.info(f"Received {sig_name}, initiating graceful shutdown...") shutdown_event.set() cleanup_processes() - # 不在这里退出,让gunicorn处理 + # Don't exit here, let gunicorn handle it class StandaloneApplication(BaseApplication): @@ -157,24 +157,24 @@ class StandaloneApplication(BaseApplication): return self.application def run(self): - """重写run方法以添加信号处理""" - # 在主进程中设置信号处理器 + """Override run method to add signal handling""" + # Set signal handlers in the main process signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) try: super().run() finally: - # 确保退出时清理所有资源 + # Ensure cleanup on exit cleanup_processes() # Gunicorn hooks def worker_exit(self, server, worker): - """当worker进程退出时调用""" + """Called when a worker process exits""" api_server_logger.info(f"Worker {worker.pid} exiting") def on_exit(self, server): - """当Arbiter退出时调用""" + """Called when the Arbiter exits""" api_server_logger.info("Gunicorn master process exiting, cleaning up...") cleanup_processes() diff --git a/fastdeploy/worker/worker_process.py b/fastdeploy/worker/worker_process.py index 37bd7b44e..da8377626 100644 --- a/fastdeploy/worker/worker_process.py +++ b/fastdeploy/worker/worker_process.py @@ -1034,9 +1034,12 @@ def run_worker_proc() -> None: """ start worker process """ - # 设置信号处理器以优雅退出 + # Set signal handlers for graceful exit def signal_handler(signum, frame): - sig_name = "SIGINT" if signum == signal.SIGINT else "SIGTERM" + try: + sig_name = signal.Signals(signum).name + except (ValueError, AttributeError): + sig_name = f"Signal({signum})" logger.info(f"Worker process received {sig_name}, shutting down gracefully...") sys.exit(0)