Address code review feedback: move imports, fix race condition, improve exception handling

Co-authored-by: Jiang-Jia-Jun <163579578+Jiang-Jia-Jun@users.noreply.github.com>
This commit is contained in:
copilot-swe-agent[bot]
2025-12-17 09:45:57 +00:00
parent cd844979e9
commit 041a361f8a

View File

@@ -23,6 +23,7 @@ import traceback
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
import numpy as np
import uvicorn
import zmq
from fastapi import FastAPI, HTTPException, Request
@@ -91,19 +92,28 @@ MAX_CONCURRENT_CONNECTIONS = args.max_concurrency
# Use shared memory for concurrency control across multiple workers
# Create or connect to a shared counter that tracks active connections globally
import numpy as np
_shm_name = f"fd_api_server_connections_{args.port}"
_create_shm = not shared_memory_exists(_shm_name)
connection_counter_shm = IPCSignal(
name=_shm_name,
array=np.array([0], dtype=np.int32) if _create_shm else None,
dtype=np.int32 if _create_shm else None,
create=_create_shm,
shm_size=4 if not _create_shm else None,
)
if not _create_shm:
# Attach to existing shared memory
connection_counter_shm.value = np.ndarray((1,), dtype=np.int32, buffer=connection_counter_shm.shm.buf)
try:
connection_counter_shm = IPCSignal(
name=_shm_name,
array=np.array([0], dtype=np.int32) if _create_shm else None,
dtype=np.int32 if _create_shm else None,
create=_create_shm,
shm_size=4 if not _create_shm else None,
)
if not _create_shm:
# Attach to existing shared memory
connection_counter_shm.value = np.ndarray((1,), dtype=np.int32, buffer=connection_counter_shm.shm.buf)
except Exception as e:
# If there's a race condition or other error, try creating fresh
api_server_logger.warning(f"Failed to attach to existing shared memory, creating new: {e}")
connection_counter_shm = IPCSignal(
name=_shm_name,
array=np.array([0], dtype=np.int32),
dtype=np.int32,
create=True,
)
# File-based lock for atomic operations on the shared counter
connection_counter_lock = FileLock(f"/tmp/fd_api_server_conn_lock_{args.port}.lock")
@@ -424,8 +434,8 @@ async def create_chat_completion(request: ChatCompletionRequest, req: Request):
if not status:
return JSONResponse(content={"error": "Worker Service Not Healthy"}, status_code=304)
try:
acquire_connection()
try:
acquire_connection()
tracing.label_span(request)
generator = await app.state.chat_handler.create_chat_completion(request)
if isinstance(generator, ErrorResponse):
@@ -438,9 +448,9 @@ async def create_chat_completion(request: ChatCompletionRequest, req: Request):
wrapped_generator = wrap_streaming_generator(generator)
return StreamingResponse(content=wrapped_generator(), media_type="text/event-stream")
except Exception:
# Release connection on any exception (including HTTPException from acquire_connection)
release_connection()
raise
except HTTPException as e:
api_server_logger.error(f"Error in chat completion: {str(e)}")
return JSONResponse(status_code=e.status_code, content={"detail": e.detail})
@@ -462,8 +472,8 @@ async def create_completion(request: CompletionRequest, req: Request):
if not status:
return JSONResponse(content={"error": "Worker Service Not Healthy"}, status_code=304)
try:
acquire_connection()
try:
acquire_connection()
tracing.label_span(request)
generator = await app.state.completion_handler.create_completion(request)
if isinstance(generator, ErrorResponse):
@@ -476,6 +486,7 @@ async def create_completion(request: CompletionRequest, req: Request):
wrapped_generator = wrap_streaming_generator(generator)
return StreamingResponse(content=wrapped_generator(), media_type="text/event-stream")
except Exception:
# Release connection on any exception (including HTTPException from acquire_connection)
release_connection()
raise
except HTTPException as e: