diff --git a/fastdeploy/engine/common_engine.py b/fastdeploy/engine/common_engine.py index cb031fbb1..b9fad30e6 100644 --- a/fastdeploy/engine/common_engine.py +++ b/fastdeploy/engine/common_engine.py @@ -760,6 +760,9 @@ class EngineService: else: time.sleep(0.005) + except RuntimeError as e: + if "cannot schedule new futures after shutdown" in str(e): + break except Exception as e: err_msg = "Error happend while insert task to engine: {}, {}.".format(e, str(traceback.format_exc())) self.llm_logger.error(err_msg) diff --git a/fastdeploy/engine/engine.py b/fastdeploy/engine/engine.py index f1a734f42..e4c0b717a 100644 --- a/fastdeploy/engine/engine.py +++ b/fastdeploy/engine/engine.py @@ -101,6 +101,25 @@ class LLMEngine: Initializes the engine and starts its sub-services. If `api_server_pid` is defined, will launch a thread to keep getting request from zmq_server. + + NOTE: To clarify the launch order of the components of the LLM engine: + 1. First, launch splitwise scheduler (if necessary) and expert services (if necessary). + 2. Then, launch common engine, which includes some background threads that inserts tasks and receives ouptuts. + 3. Most importantly, launch workers and cache services. The launch order of them are listed as follows. + + | Profile | Mixed | PrefixCache | Cache -> Worker | Worker -> Cache | + |---------|-------|-------------|-----------------|-----------------| + | 1 | 1 | 1 | 0 | 1 | + | 1 | 1 | 0 | 0 | 0 | + | 1 | 0 | 1 | 0 | 1 | + | 1 | 0 | 0 | 0 | 1 | + | 0 | 1 | 1 | 0 | 1 | + | 0 | 1 | 0 | 0 | 0 | + | 0 | 0 | 1 | 1 | 0 | + | 0 | 0 | 0 | 1 | 0 | + + 4. Finally, inform user the engine has successfully started. + """ assert not self.is_started, "The engine is already started." start_time = time.time() @@ -109,7 +128,6 @@ class LLMEngine: self.ipc_signal_suffix = self.cfg.parallel_config.engine_worker_queue_port[0] self._init_worker_signals() - # Launch components: scheduler, cache_manager, expert_service et.al. self.launch_components() self.engine.start() @@ -151,7 +169,7 @@ class LLMEngine: # and then start the cache manager if self.do_profile: self._stop_profile() - elif self.cfg.cache_config.enable_prefix_caching: + elif self.cfg.scheduler_config.splitwise_role == "mixed" and self.cfg.cache_config.enable_prefix_caching: device_ids = self.cfg.parallel_config.device_ids.split(",") self.cache_manager_processes = self.engine.start_cache_service(device_ids, self.ipc_signal_suffix) diff --git a/fastdeploy/inter_communicator/ipc_signal.py b/fastdeploy/inter_communicator/ipc_signal.py index 075f1a461..b79c91a21 100644 --- a/fastdeploy/inter_communicator/ipc_signal.py +++ b/fastdeploy/inter_communicator/ipc_signal.py @@ -80,6 +80,7 @@ class IPCSignal: name = name + f".{suffix}" if create: + llm_logger.debug(f"creating ipc signal: {name}") if shared_memory_exists(name): llm_logger.warning(f"ShareMemory: {name} already exists, delete it") SharedMemory(name=name, create=False).unlink() @@ -87,6 +88,7 @@ class IPCSignal: self.value: np.ndarray = np.ndarray(array.shape, dtype=array.dtype, buffer=self.shm.buf) self.value[:] = array # Initialize with input array data else: + llm_logger.debug(f"attaching ipc signal: {name}") self.shm = SharedMemory(name=name) self.value: np.ndarray = np.ndarray(array.shape, dtype=array.dtype, buffer=self.shm.buf) diff --git a/fastdeploy/worker/worker_process.py b/fastdeploy/worker/worker_process.py index 4ab9ccf4c..adcc24f41 100644 --- a/fastdeploy/worker/worker_process.py +++ b/fastdeploy/worker/worker_process.py @@ -41,7 +41,12 @@ from fastdeploy.config import ( StructuredOutputsConfig, ) from fastdeploy.inter_communicator import EngineWorkerQueue as TaskQueue -from fastdeploy.inter_communicator import ExistTaskStatus, IPCSignal, ModelWeightsStatus +from fastdeploy.inter_communicator import ( + ExistTaskStatus, + IPCSignal, + ModelWeightsStatus, + shared_memory_exists, +) from fastdeploy.model_executor.layers.quantization import parse_quant_config from fastdeploy.model_executor.utils import v1_loader_support from fastdeploy.platforms import current_platform @@ -426,7 +431,7 @@ class PaddleDisWorkerProc: array=prefilled_step_idx_data, dtype=np.int32, suffix=gpu_id, - create=False, + create=not shared_memory_exists(prefilled_step_name), ) step_shm_value.value[0] = -1