mirror of
https://github.com/PaddlePaddle/FastDeploy.git
synced 2025-12-24 13:28:13 +08:00
Remove unused connection_semaphore and fix manual release calls
Co-authored-by: Jiang-Jia-Jun <163579578+Jiang-Jia-Jun@users.noreply.github.com>
This commit is contained in:
@@ -108,8 +108,6 @@ if not _create_shm:
|
||||
# File-based lock for atomic operations on the shared counter
|
||||
connection_counter_lock = FileLock(f"/tmp/fd_api_server_conn_lock_{args.port}.lock")
|
||||
|
||||
connection_semaphore = StatefulSemaphore(MAX_CONCURRENT_CONNECTIONS)
|
||||
|
||||
|
||||
class StandaloneApplication(BaseApplication):
|
||||
def __init__(self, app, options=None):
|
||||
@@ -277,12 +275,11 @@ if tokens := [key for key in (args.api_key or env_tokens) if key]:
|
||||
app.add_middleware(AuthenticationMiddleware, tokens)
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def connection_manager():
|
||||
def acquire_connection():
|
||||
"""
|
||||
async context manager for connection manager using shared memory for global concurrency control across workers
|
||||
Acquire a connection slot using shared memory for global concurrency control across workers.
|
||||
Raises HTTPException if the global limit is reached.
|
||||
"""
|
||||
# Atomically increment and check the shared counter using file lock
|
||||
with connection_counter_lock:
|
||||
current_count = connection_counter_shm.value[0]
|
||||
if current_count >= MAX_CONCURRENT_CONNECTIONS:
|
||||
@@ -295,13 +292,26 @@ async def connection_manager():
|
||||
)
|
||||
# Increment the counter
|
||||
connection_counter_shm.value[0] = current_count + 1
|
||||
|
||||
|
||||
|
||||
def release_connection():
|
||||
"""
|
||||
Release a connection slot by decrementing the shared counter.
|
||||
"""
|
||||
with connection_counter_lock:
|
||||
connection_counter_shm.value[0] -= 1
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def connection_manager():
|
||||
"""
|
||||
async context manager for connection manager using shared memory for global concurrency control across workers
|
||||
"""
|
||||
acquire_connection()
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
# Decrement the counter on exit
|
||||
with connection_counter_lock:
|
||||
connection_counter_shm.value[0] -= 1
|
||||
release_connection()
|
||||
|
||||
|
||||
# TODO 传递真实引擎值 通过pid 获取状态
|
||||
@@ -387,15 +397,13 @@ def wrap_streaming_generator(original_generator: AsyncGenerator):
|
||||
# 尾包捕获
|
||||
if span is not None and span.is_recording() and count > 0:
|
||||
span.add_event("last_chunk", {"time": last_time, "total_chunk": count})
|
||||
api_server_logger.debug(f"release: {connection_semaphore.status()}")
|
||||
connection_semaphore.release()
|
||||
release_connection()
|
||||
else:
|
||||
try:
|
||||
async for chunk in original_generator:
|
||||
yield chunk
|
||||
finally:
|
||||
api_server_logger.debug(f"release: {connection_semaphore.status()}")
|
||||
connection_semaphore.release()
|
||||
release_connection()
|
||||
|
||||
return wrapped_generator
|
||||
|
||||
@@ -416,20 +424,22 @@ async def create_chat_completion(request: ChatCompletionRequest, req: Request):
|
||||
if not status:
|
||||
return JSONResponse(content={"error": "Worker Service Not Healthy"}, status_code=304)
|
||||
try:
|
||||
async with connection_manager():
|
||||
acquire_connection()
|
||||
try:
|
||||
tracing.label_span(request)
|
||||
generator = await app.state.chat_handler.create_chat_completion(request)
|
||||
if isinstance(generator, ErrorResponse):
|
||||
api_server_logger.debug(f"release: {connection_semaphore.status()}")
|
||||
connection_semaphore.release()
|
||||
release_connection()
|
||||
return JSONResponse(content=generator.model_dump(), status_code=500)
|
||||
elif isinstance(generator, ChatCompletionResponse):
|
||||
api_server_logger.debug(f"release: {connection_semaphore.status()}")
|
||||
connection_semaphore.release()
|
||||
release_connection()
|
||||
return JSONResponse(content=generator.model_dump())
|
||||
else:
|
||||
wrapped_generator = wrap_streaming_generator(generator)
|
||||
return StreamingResponse(content=wrapped_generator(), media_type="text/event-stream")
|
||||
except Exception:
|
||||
release_connection()
|
||||
raise
|
||||
|
||||
except HTTPException as e:
|
||||
api_server_logger.error(f"Error in chat completion: {str(e)}")
|
||||
@@ -452,18 +462,22 @@ async def create_completion(request: CompletionRequest, req: Request):
|
||||
if not status:
|
||||
return JSONResponse(content={"error": "Worker Service Not Healthy"}, status_code=304)
|
||||
try:
|
||||
async with connection_manager():
|
||||
acquire_connection()
|
||||
try:
|
||||
tracing.label_span(request)
|
||||
generator = await app.state.completion_handler.create_completion(request)
|
||||
if isinstance(generator, ErrorResponse):
|
||||
connection_semaphore.release()
|
||||
release_connection()
|
||||
return JSONResponse(content=generator.model_dump(), status_code=500)
|
||||
elif isinstance(generator, CompletionResponse):
|
||||
connection_semaphore.release()
|
||||
release_connection()
|
||||
return JSONResponse(content=generator.model_dump())
|
||||
else:
|
||||
wrapped_generator = wrap_streaming_generator(generator)
|
||||
return StreamingResponse(content=wrapped_generator(), media_type="text/event-stream")
|
||||
except Exception:
|
||||
release_connection()
|
||||
raise
|
||||
except HTTPException as e:
|
||||
return JSONResponse(status_code=e.status_code, content={"detail": e.detail})
|
||||
|
||||
|
||||
Reference in New Issue
Block a user