diff --git a/fastdeploy/entrypoints/openai/api_server.py b/fastdeploy/entrypoints/openai/api_server.py index 77576441c..663806d6b 100644 --- a/fastdeploy/entrypoints/openai/api_server.py +++ b/fastdeploy/entrypoints/openai/api_server.py @@ -63,7 +63,6 @@ from fastdeploy.entrypoints.openai.utils import UVICORN_CONFIG, make_arg_parser from fastdeploy.envs import environment_variables from fastdeploy.metrics.metrics import get_filtered_metrics from filelock import FileLock - from fastdeploy.inter_communicator import IPCSignal, shared_memory_exists from fastdeploy.utils import ( ExceptionHandler, @@ -95,20 +94,36 @@ MAX_CONCURRENT_CONNECTIONS = args.max_concurrency _shm_name = f"fd_api_server_connections_{args.port}" _create_shm = not shared_memory_exists(_shm_name) try: + if _create_shm: + # Create new shared memory + connection_counter_shm = IPCSignal( + name=_shm_name, + array=np.array([0], dtype=np.int32), + dtype=np.int32, + create=True, + ) + else: + # Attach to existing shared memory with proper array shape and dtype + connection_counter_shm = IPCSignal( + name=_shm_name, + array=np.array([0], dtype=np.int32), + dtype=np.int32, + create=False, + ) +except FileExistsError: + # Race condition: another worker created it between our check and creation + # Attach to the newly created shared memory + api_server_logger.info(f"Shared memory already exists, attaching to it") connection_counter_shm = IPCSignal( name=_shm_name, - array=np.array([0], dtype=np.int32) if _create_shm else None, - dtype=np.int32 if _create_shm else None, - create=_create_shm, - shm_size=4 if not _create_shm else None, + array=np.array([0], dtype=np.int32), + dtype=np.int32, + create=False, ) - if not _create_shm: - # Attach to existing shared memory by creating numpy array view of the buffer - # This is necessary because IPCSignal may not initialize .value when create=False - connection_counter_shm.value = np.ndarray((1,), dtype=np.int32, buffer=connection_counter_shm.shm.buf) -except Exception as e: - # If there's a race condition or other error, try creating fresh - api_server_logger.warning(f"Failed to attach to existing shared memory, creating new: {e}") +except FileNotFoundError: + # Race condition: shared memory was deleted between our check and attachment + # Create new shared memory + api_server_logger.warning(f"Shared memory disappeared, creating new") connection_counter_shm = IPCSignal( name=_shm_name, array=np.array([0], dtype=np.int32), @@ -289,7 +304,12 @@ if tokens := [key for key in (args.api_key or env_tokens) if key]: def acquire_connection(): """ Acquire a connection slot using shared memory for global concurrency control across workers. - Raises HTTPException if the global limit is reached. + + This function is thread-safe and uses a file-based lock for synchronization across processes. + It will block while acquiring the lock, which may impact performance under high load. + + Raises: + HTTPException: With status 429 if the global connection limit is reached. """ with connection_counter_lock: current_count = connection_counter_shm.value[0] @@ -298,7 +318,7 @@ def acquire_connection(): f"Reached max request concurrency: {current_count}/{MAX_CONCURRENT_CONNECTIONS}" ) raise HTTPException( - status_code=429, + status_code=429, detail=f"Too many requests, current max concurrency is {args.max_concurrency}" ) # Increment the counter @@ -317,16 +337,7 @@ def release_connection(): api_server_logger.warning("Attempted to release connection when counter is already 0") -@asynccontextmanager -async def connection_manager(): - """ - async context manager for connection manager using shared memory for global concurrency control across workers - """ - acquire_connection() - try: - yield - finally: - release_connection() + # TODO 传递真实引擎值 通过pid 获取状态 @@ -443,9 +454,8 @@ async def create_chat_completion(request: ChatCompletionRequest, req: Request): connection_acquired = True except HTTPException as e: # If acquire fails with 429, connection was not acquired - api_server_logger.error(f"Error in chat completion: {str(e)}") + api_server_logger.error(f"Failed to acquire connection slot for chat completion: {str(e)}") return JSONResponse(status_code=e.status_code, content={"detail": e.detail}) - try: try: tracing.label_span(request) @@ -494,7 +504,6 @@ async def create_completion(request: CompletionRequest, req: Request): except HTTPException as e: # If acquire fails with 429, connection was not acquired return JSONResponse(status_code=e.status_code, content={"detail": e.detail}) - try: try: tracing.label_span(request)