Address code review feedback: improve IPCSignal initialization, remove unused function, fix formatting

Co-authored-by: Jiang-Jia-Jun <163579578+Jiang-Jia-Jun@users.noreply.github.com>
This commit is contained in:
copilot-swe-agent[bot]
2025-12-17 10:56:26 +00:00
parent fa43c5f83e
commit 6f9b25902a

View File

@@ -63,7 +63,6 @@ from fastdeploy.entrypoints.openai.utils import UVICORN_CONFIG, make_arg_parser
from fastdeploy.envs import environment_variables
from fastdeploy.metrics.metrics import get_filtered_metrics
from filelock import FileLock
from fastdeploy.inter_communicator import IPCSignal, shared_memory_exists
from fastdeploy.utils import (
ExceptionHandler,
@@ -95,20 +94,36 @@ MAX_CONCURRENT_CONNECTIONS = args.max_concurrency
_shm_name = f"fd_api_server_connections_{args.port}"
_create_shm = not shared_memory_exists(_shm_name)
try:
if _create_shm:
# Create new shared memory
connection_counter_shm = IPCSignal(
name=_shm_name,
array=np.array([0], dtype=np.int32),
dtype=np.int32,
create=True,
)
else:
# Attach to existing shared memory with proper array shape and dtype
connection_counter_shm = IPCSignal(
name=_shm_name,
array=np.array([0], dtype=np.int32),
dtype=np.int32,
create=False,
)
except FileExistsError:
# Race condition: another worker created it between our check and creation
# Attach to the newly created shared memory
api_server_logger.info(f"Shared memory already exists, attaching to it")
connection_counter_shm = IPCSignal(
name=_shm_name,
array=np.array([0], dtype=np.int32) if _create_shm else None,
dtype=np.int32 if _create_shm else None,
create=_create_shm,
shm_size=4 if not _create_shm else None,
array=np.array([0], dtype=np.int32),
dtype=np.int32,
create=False,
)
if not _create_shm:
# Attach to existing shared memory by creating numpy array view of the buffer
# This is necessary because IPCSignal may not initialize .value when create=False
connection_counter_shm.value = np.ndarray((1,), dtype=np.int32, buffer=connection_counter_shm.shm.buf)
except Exception as e:
# If there's a race condition or other error, try creating fresh
api_server_logger.warning(f"Failed to attach to existing shared memory, creating new: {e}")
except FileNotFoundError:
# Race condition: shared memory was deleted between our check and attachment
# Create new shared memory
api_server_logger.warning(f"Shared memory disappeared, creating new")
connection_counter_shm = IPCSignal(
name=_shm_name,
array=np.array([0], dtype=np.int32),
@@ -289,7 +304,12 @@ if tokens := [key for key in (args.api_key or env_tokens) if key]:
def acquire_connection():
"""
Acquire a connection slot using shared memory for global concurrency control across workers.
Raises HTTPException if the global limit is reached.
This function is thread-safe and uses a file-based lock for synchronization across processes.
It will block while acquiring the lock, which may impact performance under high load.
Raises:
HTTPException: With status 429 if the global connection limit is reached.
"""
with connection_counter_lock:
current_count = connection_counter_shm.value[0]
@@ -298,7 +318,7 @@ def acquire_connection():
f"Reached max request concurrency: {current_count}/{MAX_CONCURRENT_CONNECTIONS}"
)
raise HTTPException(
status_code=429,
status_code=429,
detail=f"Too many requests, current max concurrency is {args.max_concurrency}"
)
# Increment the counter
@@ -317,16 +337,7 @@ def release_connection():
api_server_logger.warning("Attempted to release connection when counter is already 0")
@asynccontextmanager
async def connection_manager():
"""
async context manager for connection manager using shared memory for global concurrency control across workers
"""
acquire_connection()
try:
yield
finally:
release_connection()
# TODO 传递真实引擎值 通过pid 获取状态
@@ -443,9 +454,8 @@ async def create_chat_completion(request: ChatCompletionRequest, req: Request):
connection_acquired = True
except HTTPException as e:
# If acquire fails with 429, connection was not acquired
api_server_logger.error(f"Error in chat completion: {str(e)}")
api_server_logger.error(f"Failed to acquire connection slot for chat completion: {str(e)}")
return JSONResponse(status_code=e.status_code, content={"detail": e.detail})
try:
try:
tracing.label_span(request)
@@ -494,7 +504,6 @@ async def create_completion(request: CompletionRequest, req: Request):
except HTTPException as e:
# If acquire fails with 429, connection was not acquired
return JSONResponse(status_code=e.status_code, content={"detail": e.detail})
try:
try:
tracing.label_span(request)