From cd844979e9d1e1bed7ae7580ecef86e4ff6d01c4 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 17 Dec 2025 09:42:24 +0000 Subject: [PATCH] Remove unused connection_semaphore and fix manual release calls Co-authored-by: Jiang-Jia-Jun <163579578+Jiang-Jia-Jun@users.noreply.github.com> --- fastdeploy/entrypoints/openai/api_server.py | 58 +++++++++++++-------- 1 file changed, 36 insertions(+), 22 deletions(-) diff --git a/fastdeploy/entrypoints/openai/api_server.py b/fastdeploy/entrypoints/openai/api_server.py index 94f01b8ba..3ec87dd18 100644 --- a/fastdeploy/entrypoints/openai/api_server.py +++ b/fastdeploy/entrypoints/openai/api_server.py @@ -108,8 +108,6 @@ if not _create_shm: # File-based lock for atomic operations on the shared counter connection_counter_lock = FileLock(f"/tmp/fd_api_server_conn_lock_{args.port}.lock") -connection_semaphore = StatefulSemaphore(MAX_CONCURRENT_CONNECTIONS) - class StandaloneApplication(BaseApplication): def __init__(self, app, options=None): @@ -277,12 +275,11 @@ if tokens := [key for key in (args.api_key or env_tokens) if key]: app.add_middleware(AuthenticationMiddleware, tokens) -@asynccontextmanager -async def connection_manager(): +def acquire_connection(): """ - async context manager for connection manager using shared memory for global concurrency control across workers + Acquire a connection slot using shared memory for global concurrency control across workers. + Raises HTTPException if the global limit is reached. """ - # Atomically increment and check the shared counter using file lock with connection_counter_lock: current_count = connection_counter_shm.value[0] if current_count >= MAX_CONCURRENT_CONNECTIONS: @@ -295,13 +292,26 @@ async def connection_manager(): ) # Increment the counter connection_counter_shm.value[0] = current_count + 1 - + + +def release_connection(): + """ + Release a connection slot by decrementing the shared counter. + """ + with connection_counter_lock: + connection_counter_shm.value[0] -= 1 + + +@asynccontextmanager +async def connection_manager(): + """ + async context manager for connection manager using shared memory for global concurrency control across workers + """ + acquire_connection() try: yield finally: - # Decrement the counter on exit - with connection_counter_lock: - connection_counter_shm.value[0] -= 1 + release_connection() # TODO 传递真实引擎值 通过pid 获取状态 @@ -387,15 +397,13 @@ def wrap_streaming_generator(original_generator: AsyncGenerator): # 尾包捕获 if span is not None and span.is_recording() and count > 0: span.add_event("last_chunk", {"time": last_time, "total_chunk": count}) - api_server_logger.debug(f"release: {connection_semaphore.status()}") - connection_semaphore.release() + release_connection() else: try: async for chunk in original_generator: yield chunk finally: - api_server_logger.debug(f"release: {connection_semaphore.status()}") - connection_semaphore.release() + release_connection() return wrapped_generator @@ -416,20 +424,22 @@ async def create_chat_completion(request: ChatCompletionRequest, req: Request): if not status: return JSONResponse(content={"error": "Worker Service Not Healthy"}, status_code=304) try: - async with connection_manager(): + acquire_connection() + try: tracing.label_span(request) generator = await app.state.chat_handler.create_chat_completion(request) if isinstance(generator, ErrorResponse): - api_server_logger.debug(f"release: {connection_semaphore.status()}") - connection_semaphore.release() + release_connection() return JSONResponse(content=generator.model_dump(), status_code=500) elif isinstance(generator, ChatCompletionResponse): - api_server_logger.debug(f"release: {connection_semaphore.status()}") - connection_semaphore.release() + release_connection() return JSONResponse(content=generator.model_dump()) else: wrapped_generator = wrap_streaming_generator(generator) return StreamingResponse(content=wrapped_generator(), media_type="text/event-stream") + except Exception: + release_connection() + raise except HTTPException as e: api_server_logger.error(f"Error in chat completion: {str(e)}") @@ -452,18 +462,22 @@ async def create_completion(request: CompletionRequest, req: Request): if not status: return JSONResponse(content={"error": "Worker Service Not Healthy"}, status_code=304) try: - async with connection_manager(): + acquire_connection() + try: tracing.label_span(request) generator = await app.state.completion_handler.create_completion(request) if isinstance(generator, ErrorResponse): - connection_semaphore.release() + release_connection() return JSONResponse(content=generator.model_dump(), status_code=500) elif isinstance(generator, CompletionResponse): - connection_semaphore.release() + release_connection() return JSONResponse(content=generator.model_dump()) else: wrapped_generator = wrap_streaming_generator(generator) return StreamingResponse(content=wrapped_generator(), media_type="text/event-stream") + except Exception: + release_connection() + raise except HTTPException as e: return JSONResponse(status_code=e.status_code, content={"detail": e.detail})