From bca8905b406bc026bf4b7d823b8ec3a403b49d57 Mon Sep 17 00:00:00 2001 From: ltd0924 <32387785+ltd0924@users.noreply.github.com> Date: Tue, 19 Aug 2025 13:51:38 +0800 Subject: [PATCH] [BugFix] fix control signal release failed (#3390) * [BugFix] fix control signal release failed * [BugFix] fix control signal release failed * update * update * update --- fastdeploy/entrypoints/openai/api_server.py | 10 +-- fastdeploy/entrypoints/openai/serving_chat.py | 68 +++++++++---------- .../entrypoints/openai/serving_completion.py | 16 ++--- fastdeploy/inter_communicator/zmq_client.py | 12 +++- 4 files changed, 58 insertions(+), 48 deletions(-) diff --git a/fastdeploy/entrypoints/openai/api_server.py b/fastdeploy/entrypoints/openai/api_server.py index ee5f0d62a..98bb071d3 100644 --- a/fastdeploy/entrypoints/openai/api_server.py +++ b/fastdeploy/entrypoints/openai/api_server.py @@ -175,10 +175,10 @@ async def connection_manager(): await asyncio.wait_for(connection_semaphore.acquire(), timeout=0.001) yield except asyncio.TimeoutError: - api_server_logger.info(f"Reach max request release: {connection_semaphore.status()}") - if connection_semaphore.locked(): - connection_semaphore.release() - raise HTTPException(status_code=429, detail="Too many requests") + api_server_logger.info(f"Reach max request concurrency, semaphore status: {connection_semaphore.status()}") + raise HTTPException( + status_code=429, detail=f"Too many requests,current max concurrency is {args.max_concurrency}" + ) # TODO 传递真实引擎值 通过pid 获取状态 @@ -265,9 +265,11 @@ async def create_chat_completion(request: ChatCompletionRequest): inject_to_metadata(request) generator = await app.state.chat_handler.create_chat_completion(request) if isinstance(generator, ErrorResponse): + api_server_logger.debug(f"release: {connection_semaphore.status()}") connection_semaphore.release() return JSONResponse(content={"detail": generator.model_dump()}, status_code=generator.code) elif isinstance(generator, ChatCompletionResponse): + api_server_logger.debug(f"release: {connection_semaphore.status()}") connection_semaphore.release() return JSONResponse(content=generator.model_dump()) else: diff --git a/fastdeploy/entrypoints/openai/serving_chat.py b/fastdeploy/entrypoints/openai/serving_chat.py index d52433c0d..e94171970 100644 --- a/fastdeploy/entrypoints/openai/serving_chat.py +++ b/fastdeploy/entrypoints/openai/serving_chat.py @@ -78,47 +78,47 @@ class OpenAIServingChat: err_msg = f"Only master node can accept completion request, please send request to master node: {self.pod_ips[0]}" api_server_logger.error(err_msg) return ErrorResponse(message=err_msg, code=400) - - if request.user is not None: - request_id = f"chatcmpl-{request.user}-{uuid.uuid4()}" - else: - request_id = f"chatcmpl-{uuid.uuid4()}" - api_server_logger.info(f"create chat completion request: {request_id}") - text_after_process = None try: - current_req_dict = request.to_dict_for_infer(request_id) - if "chat_template" not in current_req_dict: - current_req_dict["chat_template"] = self.chat_template - current_req_dict["arrival_time"] = time.time() - prompt_token_ids = self.engine_client.format_and_add_data(current_req_dict) - text_after_process = current_req_dict.get("text_after_process") - if isinstance(prompt_token_ids, np.ndarray): - prompt_token_ids = prompt_token_ids.tolist() - except Exception as e: - return ErrorResponse(code=400, message=str(e)) - - del current_req_dict - try: - api_server_logger.debug(f"{self.engine_client.semaphore.status()}") if self.max_waiting_time < 0: await self.engine_client.semaphore.acquire() else: await asyncio.wait_for(self.engine_client.semaphore.acquire(), timeout=self.max_waiting_time) - except Exception: - return ErrorResponse(code=408, message=f"Request queued time exceed {self.max_waiting_time}") + api_server_logger.info(f"current {self.engine_client.semaphore.status()}") - if request.stream: - return self.chat_completion_stream_generator( - request, request_id, request.model, prompt_token_ids, text_after_process - ) - else: + if request.user is not None: + request_id = f"chatcmpl-{request.user}-{uuid.uuid4()}" + else: + request_id = f"chatcmpl-{uuid.uuid4()}" + api_server_logger.info(f"create chat completion request: {request_id}") + text_after_process = None try: - return await self.chat_completion_full_generator( - request, request_id, request.model, prompt_token_ids, text_after_process - ) + current_req_dict = request.to_dict_for_infer(request_id) + if "chat_template" not in current_req_dict: + current_req_dict["chat_template"] = self.chat_template + current_req_dict["arrival_time"] = time.time() + prompt_token_ids = self.engine_client.format_and_add_data(current_req_dict) + text_after_process = current_req_dict.get("text_after_process") + if isinstance(prompt_token_ids, np.ndarray): + prompt_token_ids = prompt_token_ids.tolist() except Exception as e: return ErrorResponse(code=400, message=str(e)) + del current_req_dict + + if request.stream: + return self.chat_completion_stream_generator( + request, request_id, request.model, prompt_token_ids, text_after_process + ) + else: + try: + return await self.chat_completion_full_generator( + request, request_id, request.model, prompt_token_ids, text_after_process + ) + except Exception as e: + return ErrorResponse(code=400, message=str(e)) + except Exception: + return ErrorResponse(code=408, message=f"Request queued time exceed {self.max_waiting_time}") + def _create_streaming_error_response(self, message: str) -> str: error_response = ErrorResponse( code=400, @@ -254,6 +254,7 @@ class OpenAIServingChat: logprobs_res = self._create_chat_logprobs( output_top_logprobs, request.logprobs, request.top_logprobs ) + if self.engine_client.data_processor.tool_parser_obj and not res["finished"]: tool_delta_message = output["tool_delta_message"] if tool_delta_message is None: @@ -277,7 +278,6 @@ class OpenAIServingChat: logprobs=logprobs_res, arrival_time=arrival_time, ) - if res["finished"]: num_choices -= 1 work_process_metrics.e2e_request_latency.observe( @@ -309,7 +309,6 @@ class OpenAIServingChat: if len(choices) == max_streaming_response_tokens or res["finished"]: chunk.choices = choices yield f"data: {chunk.model_dump_json(exclude_unset=True)}\n\n" - # 打印尾包 if res["finished"]: api_server_logger.info(f"Chat Streaming response last send: {chunk.model_dump_json()}") choices = [] @@ -417,8 +416,9 @@ class OpenAIServingChat: if task_is_finished: break finally: - self.engine_client.semaphore.release() dealer.close() + self.engine_client.semaphore.release() + api_server_logger.info(f"release {self.engine_client.semaphore.status()}") choices = [] output = final_res["outputs"] diff --git a/fastdeploy/entrypoints/openai/serving_completion.py b/fastdeploy/entrypoints/openai/serving_completion.py index 7261eeb78..43336dac6 100644 --- a/fastdeploy/entrypoints/openai/serving_completion.py +++ b/fastdeploy/entrypoints/openai/serving_completion.py @@ -101,6 +101,14 @@ class OpenAIServingCompletion: api_server_logger.info(f"start inference for request {num_choices}") prompt_batched_token_ids = [] text_after_process_list = [] + try: + if self.max_waiting_time < 0: + await self.engine_client.semaphore.acquire() + else: + await asyncio.wait_for(self.engine_client.semaphore.acquire(), timeout=self.max_waiting_time) + except Exception: + return ErrorResponse(code=408, message=f"Request queued time exceed {self.max_waiting_time}") + try: for idx, prompt in enumerate(request_prompts): request_id_idx = f"{request_id}-{idx}" @@ -117,14 +125,6 @@ class OpenAIServingCompletion: del current_req_dict - try: - if self.max_waiting_time < 0: - await self.engine_client.semaphore.acquire() - else: - await asyncio.wait_for(self.engine_client.semaphore.acquire(), timeout=self.max_waiting_time) - except Exception: - return ErrorResponse(code=408, message=f"Request queued time exceed {self.max_waiting_time}") - if request.stream: return self.completion_stream_generator( request=request, diff --git a/fastdeploy/inter_communicator/zmq_client.py b/fastdeploy/inter_communicator/zmq_client.py index 05e55929d..5143d9d47 100644 --- a/fastdeploy/inter_communicator/zmq_client.py +++ b/fastdeploy/inter_communicator/zmq_client.py @@ -31,7 +31,7 @@ class ZmqClient: """ def __init__(self, name, mode): - self.context = zmq.Context() + self.context = zmq.Context(4) self.socket = self.context.socket(mode) self.file_name = f"/dev/shm/{name}.socket" self.router_path = f"/dev/shm/router_{name}.ipc" @@ -67,6 +67,7 @@ class ZmqClient: """ self.router = self.context.socket(zmq.ROUTER) self.router.setsockopt(zmq.SNDHWM, self.ZMQ_SNDHWM) + self.router.setsockopt(zmq.ROUTER_MANDATORY, 1) self.router.setsockopt(zmq.SNDTIMEO, -1) self.router.bind(f"ipc://{self.router_path}") @@ -125,6 +126,11 @@ class ZmqClient: else: break + if self.req_dict[req_id] == -1: + if data[-1].finished: + with self.mutex: + self.req_dict.pop(req_id, None) + return try: start_send = time.time() if self.aggregate_send: @@ -133,7 +139,9 @@ class ZmqClient: result = msgpack.packb([response.to_dict() for response in data]) self.router.send_multipart([self.req_dict[req_id], b"", result]) llm_logger.debug(f"send_multipart result: {req_id} len {len(data)} elapse: {time.time()-start_send}") - + except zmq.ZMQError as e: + llm_logger.error(f"[{req_id}] zmq error: {e}") + self.req_dict[req_id] = -1 except Exception as e: llm_logger.error(f"Send result to zmq client failed: {e}")