mirror of
https://github.com/PaddlePaddle/FastDeploy.git
synced 2025-12-24 13:28:13 +08:00
Fix connection release logic and add bounds checking to prevent negative counter
Co-authored-by: Jiang-Jia-Jun <163579578+Jiang-Jia-Jun@users.noreply.github.com>
This commit is contained in:
@@ -103,7 +103,8 @@ try:
|
||||
shm_size=4 if not _create_shm else None,
|
||||
)
|
||||
if not _create_shm:
|
||||
# Attach to existing shared memory
|
||||
# Attach to existing shared memory by creating numpy array view of the buffer
|
||||
# This is necessary because IPCSignal may not initialize .value when create=False
|
||||
connection_counter_shm.value = np.ndarray((1,), dtype=np.int32, buffer=connection_counter_shm.shm.buf)
|
||||
except Exception as e:
|
||||
# If there's a race condition or other error, try creating fresh
|
||||
@@ -307,9 +308,13 @@ def acquire_connection():
|
||||
def release_connection():
|
||||
"""
|
||||
Release a connection slot by decrementing the shared counter.
|
||||
Includes bounds checking to prevent counter from going negative.
|
||||
"""
|
||||
with connection_counter_lock:
|
||||
connection_counter_shm.value[0] -= 1
|
||||
if connection_counter_shm.value[0] > 0:
|
||||
connection_counter_shm.value[0] -= 1
|
||||
else:
|
||||
api_server_logger.warning("Attempted to release connection when counter is already 0")
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
@@ -433,23 +438,35 @@ async def create_chat_completion(request: ChatCompletionRequest, req: Request):
|
||||
status, msg = app.state.engine_client.is_workers_alive()
|
||||
if not status:
|
||||
return JSONResponse(content={"error": "Worker Service Not Healthy"}, status_code=304)
|
||||
try:
|
||||
acquire_connection()
|
||||
connection_acquired = True
|
||||
except HTTPException as e:
|
||||
# If acquire fails with 429, connection was not acquired
|
||||
api_server_logger.error(f"Error in chat completion: {str(e)}")
|
||||
return JSONResponse(status_code=e.status_code, content={"detail": e.detail})
|
||||
|
||||
try:
|
||||
try:
|
||||
acquire_connection()
|
||||
tracing.label_span(request)
|
||||
generator = await app.state.chat_handler.create_chat_completion(request)
|
||||
if isinstance(generator, ErrorResponse):
|
||||
release_connection()
|
||||
connection_acquired = False
|
||||
return JSONResponse(content=generator.model_dump(), status_code=500)
|
||||
elif isinstance(generator, ChatCompletionResponse):
|
||||
release_connection()
|
||||
connection_acquired = False
|
||||
return JSONResponse(content=generator.model_dump())
|
||||
else:
|
||||
# For streaming, release happens in wrap_streaming_generator
|
||||
connection_acquired = False
|
||||
wrapped_generator = wrap_streaming_generator(generator)
|
||||
return StreamingResponse(content=wrapped_generator(), media_type="text/event-stream")
|
||||
except Exception:
|
||||
# Release connection on any exception (including HTTPException from acquire_connection)
|
||||
release_connection()
|
||||
# Release connection only if it was acquired
|
||||
if connection_acquired:
|
||||
release_connection()
|
||||
raise
|
||||
except HTTPException as e:
|
||||
api_server_logger.error(f"Error in chat completion: {str(e)}")
|
||||
@@ -471,23 +488,34 @@ async def create_completion(request: CompletionRequest, req: Request):
|
||||
status, msg = app.state.engine_client.is_workers_alive()
|
||||
if not status:
|
||||
return JSONResponse(content={"error": "Worker Service Not Healthy"}, status_code=304)
|
||||
try:
|
||||
acquire_connection()
|
||||
connection_acquired = True
|
||||
except HTTPException as e:
|
||||
# If acquire fails with 429, connection was not acquired
|
||||
return JSONResponse(status_code=e.status_code, content={"detail": e.detail})
|
||||
|
||||
try:
|
||||
try:
|
||||
acquire_connection()
|
||||
tracing.label_span(request)
|
||||
generator = await app.state.completion_handler.create_completion(request)
|
||||
if isinstance(generator, ErrorResponse):
|
||||
release_connection()
|
||||
connection_acquired = False
|
||||
return JSONResponse(content=generator.model_dump(), status_code=500)
|
||||
elif isinstance(generator, CompletionResponse):
|
||||
release_connection()
|
||||
connection_acquired = False
|
||||
return JSONResponse(content=generator.model_dump())
|
||||
else:
|
||||
# For streaming, release happens in wrap_streaming_generator
|
||||
connection_acquired = False
|
||||
wrapped_generator = wrap_streaming_generator(generator)
|
||||
return StreamingResponse(content=wrapped_generator(), media_type="text/event-stream")
|
||||
except Exception:
|
||||
# Release connection on any exception (including HTTPException from acquire_connection)
|
||||
release_connection()
|
||||
# Release connection only if it was acquired
|
||||
if connection_acquired:
|
||||
release_connection()
|
||||
raise
|
||||
except HTTPException as e:
|
||||
return JSONResponse(status_code=e.status_code, content={"detail": e.detail})
|
||||
|
||||
Reference in New Issue
Block a user