diff --git a/fastdeploy/engine/engine.py b/fastdeploy/engine/engine.py index 196ada672..e42681820 100644 --- a/fastdeploy/engine/engine.py +++ b/fastdeploy/engine/engine.py @@ -376,6 +376,7 @@ class LLMEngine: exit sub services """ self.running = False + llm_logger.info("Engine shut down, exiting sub services...") if hasattr(self, "cache_manager_processes"): self.engine.resource_manager.cache_manager.shm_cache_task_flag_broadcast.clear() @@ -394,6 +395,7 @@ class LLMEngine: if hasattr(self, "get_profile_block_num_signal"): self.get_profile_block_num_signal.clear() + if hasattr(self, "worker_proc") and self.worker_proc is not None: try: pgid = os.getpgid(self.worker_proc.pid) @@ -403,6 +405,7 @@ class LLMEngine: if hasattr(self, "zmq_server") and self.zmq_server is not None: self.zmq_server.close() + if hasattr(self, "dp_processed"): for p in self.dp_processed: console_logger.info(f"Waiting for worker {p.pid} to exit") diff --git a/fastdeploy/entrypoints/openai/api_server.py b/fastdeploy/entrypoints/openai/api_server.py index 884a8fc9b..695107fc7 100644 --- a/fastdeploy/entrypoints/openai/api_server.py +++ b/fastdeploy/entrypoints/openai/api_server.py @@ -17,6 +17,7 @@ import asyncio import json import os +import signal import threading import time import traceback @@ -649,6 +650,27 @@ def launch_controller_server(): time.sleep(1) +def launch_worker_monitor(): + """ + Detect whether worker process is alive. If not, stop the API serverby triggering llm_engine. + """ + + def _monitor(): + global llm_engine + while True: + if hasattr(llm_engine, "worker_proc") and llm_engine.worker_proc.poll() is not None: + console_logger.error( + f"Worker process has died in the background (code={llm_engine.worker_proc.returncode}). API server is forced to stop." + ) + os.kill(os.getpid(), signal.SIGINT) + break + time.sleep(5) + + worker_monitor_thread = threading.Thread(target=_monitor, daemon=True) + worker_monitor_thread.start() + time.sleep(1) + + def main(): """main函数""" if args.local_data_parallel_id == 0: @@ -662,6 +684,7 @@ def main(): console_logger.info(f"Launching chat completion service at http://{args.host}:{args.port}/v1/chat/completions") console_logger.info(f"Launching completion service at http://{args.host}:{args.port}/v1/completions") + launch_worker_monitor() launch_controller_server() launch_metrics_server() launch_api_server()