From 041a361f8a5cb3ca638d65fc1b666fbe4bbf2a63 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 17 Dec 2025 09:45:57 +0000 Subject: [PATCH] Address code review feedback: move imports, fix race condition, improve exception handling Co-authored-by: Jiang-Jia-Jun <163579578+Jiang-Jia-Jun@users.noreply.github.com> --- fastdeploy/entrypoints/openai/api_server.py | 39 +++++++++++++-------- 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/fastdeploy/entrypoints/openai/api_server.py b/fastdeploy/entrypoints/openai/api_server.py index 3ec87dd18..c2bc6a23d 100644 --- a/fastdeploy/entrypoints/openai/api_server.py +++ b/fastdeploy/entrypoints/openai/api_server.py @@ -23,6 +23,7 @@ import traceback from collections.abc import AsyncGenerator from contextlib import asynccontextmanager +import numpy as np import uvicorn import zmq from fastapi import FastAPI, HTTPException, Request @@ -91,19 +92,28 @@ MAX_CONCURRENT_CONNECTIONS = args.max_concurrency # Use shared memory for concurrency control across multiple workers # Create or connect to a shared counter that tracks active connections globally -import numpy as np _shm_name = f"fd_api_server_connections_{args.port}" _create_shm = not shared_memory_exists(_shm_name) -connection_counter_shm = IPCSignal( - name=_shm_name, - array=np.array([0], dtype=np.int32) if _create_shm else None, - dtype=np.int32 if _create_shm else None, - create=_create_shm, - shm_size=4 if not _create_shm else None, -) -if not _create_shm: - # Attach to existing shared memory - connection_counter_shm.value = np.ndarray((1,), dtype=np.int32, buffer=connection_counter_shm.shm.buf) +try: + connection_counter_shm = IPCSignal( + name=_shm_name, + array=np.array([0], dtype=np.int32) if _create_shm else None, + dtype=np.int32 if _create_shm else None, + create=_create_shm, + shm_size=4 if not _create_shm else None, + ) + if not _create_shm: + # Attach to existing shared memory + connection_counter_shm.value = np.ndarray((1,), dtype=np.int32, buffer=connection_counter_shm.shm.buf) +except Exception as e: + # If there's a race condition or other error, try creating fresh + api_server_logger.warning(f"Failed to attach to existing shared memory, creating new: {e}") + connection_counter_shm = IPCSignal( + name=_shm_name, + array=np.array([0], dtype=np.int32), + dtype=np.int32, + create=True, + ) # File-based lock for atomic operations on the shared counter connection_counter_lock = FileLock(f"/tmp/fd_api_server_conn_lock_{args.port}.lock") @@ -424,8 +434,8 @@ async def create_chat_completion(request: ChatCompletionRequest, req: Request): if not status: return JSONResponse(content={"error": "Worker Service Not Healthy"}, status_code=304) try: - acquire_connection() try: + acquire_connection() tracing.label_span(request) generator = await app.state.chat_handler.create_chat_completion(request) if isinstance(generator, ErrorResponse): @@ -438,9 +448,9 @@ async def create_chat_completion(request: ChatCompletionRequest, req: Request): wrapped_generator = wrap_streaming_generator(generator) return StreamingResponse(content=wrapped_generator(), media_type="text/event-stream") except Exception: + # Release connection on any exception (including HTTPException from acquire_connection) release_connection() raise - except HTTPException as e: api_server_logger.error(f"Error in chat completion: {str(e)}") return JSONResponse(status_code=e.status_code, content={"detail": e.detail}) @@ -462,8 +472,8 @@ async def create_completion(request: CompletionRequest, req: Request): if not status: return JSONResponse(content={"error": "Worker Service Not Healthy"}, status_code=304) try: - acquire_connection() try: + acquire_connection() tracing.label_span(request) generator = await app.state.completion_handler.create_completion(request) if isinstance(generator, ErrorResponse): @@ -476,6 +486,7 @@ async def create_completion(request: CompletionRequest, req: Request): wrapped_generator = wrap_streaming_generator(generator) return StreamingResponse(content=wrapped_generator(), media_type="text/event-stream") except Exception: + # Release connection on any exception (including HTTPException from acquire_connection) release_connection() raise except HTTPException as e: