add error traceback info (#3419)
Some checks failed
Deploy GitHub Pages / deploy (push) Has been cancelled

* add error traceback info

* update error msg

* update code

---------

Co-authored-by: Jiang-Jia-Jun <163579578+Jiang-Jia-Jun@users.noreply.github.com>
This commit is contained in:
kevin
2025-08-19 19:32:04 +08:00
committed by GitHub
parent b047681c5d
commit 67298cf4c0
30 changed files with 166 additions and 83 deletions

View File

@@ -17,6 +17,7 @@
import math import math
import threading import threading
import time import time
import traceback
import numpy as np import numpy as np
import paddle import paddle
@@ -309,4 +310,4 @@ class CacheMessager:
self.last_layer_idx = prefilled_layer_idx self.last_layer_idx = prefilled_layer_idx
except Exception as e: except Exception as e:
logger.error(f"prefill layerwise send cache thread has exception: {e}") logger.error(f"prefill layerwise send cache thread has exception: {e}, {str(traceback.format_exc())}")

View File

@@ -19,6 +19,7 @@ import concurrent.futures
import json import json
import queue import queue
import time import time
import traceback
import numpy as np import numpy as np
import paddle import paddle
@@ -342,7 +343,7 @@ class CacheTransferManager:
if self.rank == 0: if self.rank == 0:
self.cache_task_queue.barrier3.reset() self.cache_task_queue.barrier3.reset()
except Exception as e: except Exception as e:
logger.info(f"do_data_transfer: error: {e}") logger.info(f"do_data_transfer: error: {e}, {str(traceback.format_exc())}")
def _transfer_data( def _transfer_data(
self, self,

View File

@@ -20,6 +20,7 @@ import subprocess
import sys import sys
import threading import threading
import time import time
import traceback
import uuid import uuid
from collections import defaultdict from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
@@ -469,7 +470,7 @@ class PrefixCacheManager:
self.leaf_req_map[leaf_node].add(req_id) self.leaf_req_map[leaf_node].add(req_id)
self.cache_info[req_id] = (leaf_node, input_ids) self.cache_info[req_id] = (leaf_node, input_ids)
except Exception as e: except Exception as e:
logger.error(f"update_cache_blocks, error: {type(e)} {e}") logger.error(f"update_cache_blocks, error: {type(e)} {e}, {str(traceback.format_exc())}")
raise e raise e
def request_match_blocks(self, task, block_size, *args): def request_match_blocks(self, task, block_size, *args):
@@ -555,7 +556,7 @@ class PrefixCacheManager:
) )
return common_block_ids, matched_token_num, hit_info return common_block_ids, matched_token_num, hit_info
except Exception as e: except Exception as e:
logger.error(f"request_block_ids: error: {type(e)} {e}") logger.error(f"request_block_ids: error: {type(e)} {e}, {str(traceback.format_exc())}")
raise e raise e
def request_block_ids(self, task, block_size, dec_token_num, *args): def request_block_ids(self, task, block_size, dec_token_num, *args):
@@ -660,7 +661,7 @@ class PrefixCacheManager:
) )
return common_block_ids, unique_block_ids, hit_info return common_block_ids, unique_block_ids, hit_info
except Exception as e: except Exception as e:
logger.error(f"request_block_ids: error: {type(e)} {e}") logger.error(f"request_block_ids: error: {type(e)} {e}, {str(traceback.format_exc())}")
raise e raise e
def release_block_ids_async(self, task): def release_block_ids_async(self, task):
@@ -709,7 +710,7 @@ class PrefixCacheManager:
) )
return return
except Exception as e: except Exception as e:
logger.error(f"release_block_ids: error: {type(e)} {e}") logger.error(f"release_block_ids: error: {type(e)} {e}, {str(traceback.format_exc())}")
raise e raise e
def _handle_free_gpu_node_without_cpu(self, node): def _handle_free_gpu_node_without_cpu(self, node):
@@ -899,7 +900,7 @@ class PrefixCacheManager:
else: else:
self.gpu_free_task_future = None self.gpu_free_task_future = None
except Exception as e: except Exception as e:
logger.error(f"free_block_ids_async: error: {type(e)} {e}") logger.error(f"free_block_ids_async: error: {type(e)} {e}, {str(traceback.format_exc())}")
raise e raise e
def free_cpu_block_ids(self, need_block_num): def free_cpu_block_ids(self, need_block_num):
@@ -1218,5 +1219,5 @@ class PrefixCacheManager:
+ f"task_cpu_block_id {task_cpu_block_id} event_type {event_type} done" + f"task_cpu_block_id {task_cpu_block_id} event_type {event_type} done"
) )
except Exception as e: except Exception as e:
logger.warning(f"recv_data_transfer_result: error: {e}") logger.warning(f"recv_data_transfer_result: error: {e}, {str(traceback.format_exc())}")
raise e raise e

View File

@@ -597,7 +597,7 @@ class LLMEngine:
time.sleep(0.001) time.sleep(0.001)
except Exception as e: except Exception as e:
llm_logger.error(f"Error in main loop: {e}") llm_logger.error(f"Error in main loop: {e}, {str(traceback.format_exc())}")
time.sleep(0.1) time.sleep(0.1)
threading.Thread(target=receiver_loop, daemon=True).start() threading.Thread(target=receiver_loop, daemon=True).start()
@@ -985,7 +985,9 @@ class LLMEngine:
try: try:
os.killpg(p.pid, signal.SIGTERM) os.killpg(p.pid, signal.SIGTERM)
except Exception as e: except Exception as e:
print(f"Error extracting file: {e}") console_logger.error(
f"Error killing cache manager process {p.pid}: {e}, {str(traceback.format_exc())}"
)
self.worker_ready_signal.clear() self.worker_ready_signal.clear()
self.exist_task_signal.clear() self.exist_task_signal.clear()
self.exist_swapped_task_signal.clear() self.exist_swapped_task_signal.clear()
@@ -998,7 +1000,7 @@ class LLMEngine:
try: try:
os.killpg(self.worker_proc.pid, signal.SIGTERM) os.killpg(self.worker_proc.pid, signal.SIGTERM)
except Exception as e: except Exception as e:
print(f"Error extracting sub services: {e}") console_logger.error(f"Error extracting sub services: {e}, {str(traceback.format_exc())}")
self.engine_worker_queue.cleanup() self.engine_worker_queue.cleanup()
if hasattr(self, "zmq_server") and self.zmq_server is not None: if hasattr(self, "zmq_server") and self.zmq_server is not None:
@@ -1173,7 +1175,7 @@ class LLMEngine:
try: try:
req_id = self._format_and_add_data(prompts) req_id = self._format_and_add_data(prompts)
except Exception as e: except Exception as e:
llm_logger.error(f"Error happend while adding request, details={e}") llm_logger.error(f"Error happend while adding request, details={e}, {str(traceback.format_exc())}")
raise EngineError(str(e), error_code=400) raise EngineError(str(e), error_code=400)
# Get the result of the current request # Get the result of the current request

View File

@@ -269,7 +269,7 @@ class ExpertService:
time.sleep(0.001) time.sleep(0.001)
continue continue
except Exception as e: except Exception as e:
llm_logger.error(f"get decode tasks error: {e}") llm_logger.error(f"get decode tasks error: {e}, {str(traceback.format_exc())}")
threading.Thread(target=receiver_loop, daemon=True).start() threading.Thread(target=receiver_loop, daemon=True).start()
@@ -378,4 +378,4 @@ def start_expert_service(cfg, local_data_parallel_id, ipc_signal_suffix):
expert_service.start(ipc_signal_suffix, local_data_parallel_id) expert_service.start(ipc_signal_suffix, local_data_parallel_id)
expert_service.split_connector.start_receiver() expert_service.split_connector.start_receiver()
except Exception as e: except Exception as e:
llm_logger.exception(f"Expert service failed to start: {e}") llm_logger.exception(f"Expert service failed to start: {e}, {str(traceback.format_exc())}")

View File

@@ -16,6 +16,7 @@
import threading import threading
import time import time
import traceback
from collections import deque from collections import deque
from collections.abc import Iterable from collections.abc import Iterable
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
@@ -389,7 +390,7 @@ class ResourceManagerV1(ResourceManager):
request.cache_prepare_time = time.time() - cache_prepare_time request.cache_prepare_time = time.time() - cache_prepare_time
return True return True
except Exception as e: except Exception as e:
llm_logger.error(f"prefix match blocks error: {e}, waiting reschedule...") llm_logger.error(f"prefix match blocks error: {e}, {str(traceback.format_exc())} waiting reschedule...")
return False return False
def add_request(self, request: Request) -> None: def add_request(self, request: Request) -> None:
@@ -441,4 +442,4 @@ class ResourceManagerV1(ResourceManager):
self.stop_flags[request.idx] = True self.stop_flags[request.idx] = True
del self.requests[req_id] del self.requests[req_id]
except Exception as e: except Exception as e:
llm_logger.error(e) llm_logger.error(f"finish_request err: {e}, {str(traceback.format_exc())}")

View File

@@ -15,6 +15,7 @@
""" """
import json import json
import traceback
import uvicorn import uvicorn
from fastapi import FastAPI from fastapi import FastAPI
@@ -114,7 +115,7 @@ def launch_api_server(args) -> None:
log_level="info", log_level="info",
) # set log level to error to avoid log ) # set log level to error to avoid log
except Exception as e: except Exception as e:
api_server_logger.error(f"launch sync http server error, {e}") api_server_logger.error(f"launch sync http server error, {e}, {str(traceback.format_exc())}")
def main(): def main():

View File

@@ -15,6 +15,7 @@
""" """
import time import time
import traceback
import uuid import uuid
import numpy as np import numpy as np
@@ -141,7 +142,7 @@ class EngineClient:
work_process_metrics.prompt_tokens_total.inc(input_ids_len) work_process_metrics.prompt_tokens_total.inc(input_ids_len)
work_process_metrics.request_prompt_tokens.observe(input_ids_len) work_process_metrics.request_prompt_tokens.observe(input_ids_len)
except Exception as e: except Exception as e:
api_server_logger.error(e) api_server_logger.error(f"add_requests error: {e}, {str(traceback.format_exc())}")
raise EngineError(str(e), error_code=400) raise EngineError(str(e), error_code=400)
if input_ids_len + min_tokens >= self.max_model_len: if input_ids_len + min_tokens >= self.max_model_len:
@@ -194,7 +195,7 @@ class EngineClient:
else: else:
self.zmq_client.send_pyobj(task) self.zmq_client.send_pyobj(task)
except Exception as e: except Exception as e:
api_server_logger.error(e) api_server_logger.error(f"zmq_client send task error: {e}, {str(traceback.format_exc())}")
raise EngineError(str(e), error_code=400) raise EngineError(str(e), error_code=400)
def vaild_parameters(self, data): def vaild_parameters(self, data):

View File

@@ -346,7 +346,7 @@ class LLM:
return result return result
except Exception as e: except Exception as e:
llm_logger.error(f"Error building sample logprobs from LogprobsLists: {e}") llm_logger.error(f"Error building sample logprobs from LogprobsLists: {e}, {str(traceback.format_exc())}")
def _run_engine(self, req_ids: list[str], use_tqdm: bool, topk_logprobs: Optional[int] = None): def _run_engine(self, req_ids: list[str], use_tqdm: bool, topk_logprobs: Optional[int] = None):
""" """

View File

@@ -18,6 +18,7 @@ import asyncio
import os import os
import threading import threading
import time import time
import traceback
from collections.abc import AsyncGenerator from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from multiprocessing import current_process from multiprocessing import current_process
@@ -159,7 +160,7 @@ async def lifespan(app: FastAPI):
multiprocess.mark_process_dead(os.getpid()) multiprocess.mark_process_dead(os.getpid())
api_server_logger.info(f"Closing metrics client pid: {pid}") api_server_logger.info(f"Closing metrics client pid: {pid}")
except Exception as e: except Exception as e:
api_server_logger.warning(e) api_server_logger.warning(f"exit error: {e}, {str(traceback.format_exc())}")
app = FastAPI(lifespan=lifespan) app = FastAPI(lifespan=lifespan)
@@ -355,7 +356,7 @@ def launch_api_server() -> None:
log_level="info", log_level="info",
) # set log level to error to avoid log ) # set log level to error to avoid log
except Exception as e: except Exception as e:
api_server_logger.error(f"launch sync http server error, {e}") api_server_logger.error(f"launch sync http server error, {e}, {str(traceback.format_exc())}")
metrics_app = FastAPI() metrics_app = FastAPI()

View File

@@ -101,7 +101,9 @@ class OpenAIServingChat:
if isinstance(prompt_token_ids, np.ndarray): if isinstance(prompt_token_ids, np.ndarray):
prompt_token_ids = prompt_token_ids.tolist() prompt_token_ids = prompt_token_ids.tolist()
except Exception as e: except Exception as e:
return ErrorResponse(code=400, message=str(e)) error_msg = f"request[{request_id}] generator error: {str(e)}, {str(traceback.format_exc())}"
api_server_logger.error(error_msg)
return ErrorResponse(code=400, message=error_msg)
del current_req_dict del current_req_dict
@@ -115,11 +117,19 @@ class OpenAIServingChat:
request, request_id, request.model, prompt_token_ids, text_after_process request, request_id, request.model, prompt_token_ids, text_after_process
) )
except Exception as e: except Exception as e:
return ErrorResponse(code=400, message=str(e)) error_msg = f"request[{request_id}]full generator error: {str(e)}, {str(traceback.format_exc())}"
except Exception: api_server_logger.error(error_msg)
return ErrorResponse(code=408, message=f"Request queued time exceed {self.max_waiting_time}") return ErrorResponse(code=408, message=error_msg)
except Exception as e:
error_msg = (
f"request[{request_id}] waiting error: {str(e)}, {str(traceback.format_exc())}, "
f"max waiting time: {self.max_waiting_time}"
)
api_server_logger.error(error_msg)
return ErrorResponse(code=408, message=error_msg)
def _create_streaming_error_response(self, message: str) -> str: def _create_streaming_error_response(self, message: str) -> str:
api_server_logger.error(message)
error_response = ErrorResponse( error_response = ErrorResponse(
code=400, code=400,
message=message, message=message,
@@ -336,7 +346,9 @@ class OpenAIServingChat:
yield f"data: {chunk.model_dump_json(exclude_unset=True)}\n\n" yield f"data: {chunk.model_dump_json(exclude_unset=True)}\n\n"
except Exception as e: except Exception as e:
error_data = self._create_streaming_error_response(str(e)) error_data = self._create_streaming_error_response(
f"request[{request_id}] generate stream error: {str(e)}, {str(traceback.format_exc())}"
)
yield f"data: {error_data}\n\n" yield f"data: {error_data}\n\n"
finally: finally:
dealer.close() dealer.close()
@@ -556,6 +568,6 @@ class OpenAIServingChat:
return LogProbs(content=[sampled_entry]) return LogProbs(content=[sampled_entry])
except Exception as e: except Exception as e:
api_server_logger.error("Error in _build_logprobs_response: %s", e) error_msg = f"Error in _build_logprobs_response: {e}, {str(traceback.format_exc())}"
api_server_logger.error(traceback.format_exc()) api_server_logger.error(error_msg)
return None return None

View File

@@ -16,6 +16,7 @@
import asyncio import asyncio
import time import time
import traceback
import uuid import uuid
from typing import List, Optional from typing import List, Optional
@@ -92,7 +93,9 @@ class OpenAIServingCompletion:
else: else:
raise ValueError("Prompt must be a string, a list of strings or a list of integers.") raise ValueError("Prompt must be a string, a list of strings or a list of integers.")
except Exception as e: except Exception as e:
return ErrorResponse(message=str(e), code=400) error_msg = f"OpenAIServingCompletion create_completion: {e}, {str(traceback.format_exc())}"
api_server_logger.error(error_msg)
return ErrorResponse(message=error_msg, code=400)
if request_prompt_ids is not None: if request_prompt_ids is not None:
request_prompts = request_prompt_ids request_prompts = request_prompt_ids
@@ -106,8 +109,13 @@ class OpenAIServingCompletion:
await self.engine_client.semaphore.acquire() await self.engine_client.semaphore.acquire()
else: else:
await asyncio.wait_for(self.engine_client.semaphore.acquire(), timeout=self.max_waiting_time) await asyncio.wait_for(self.engine_client.semaphore.acquire(), timeout=self.max_waiting_time)
except Exception: except Exception as e:
return ErrorResponse(code=408, message=f"Request queued time exceed {self.max_waiting_time}") error_msg = (
f"OpenAIServingCompletion waiting error: {e}, {str(traceback.format_exc())}, "
f"max waiting time: {self.max_waiting_time}"
)
api_server_logger.error(error_msg)
return ErrorResponse(code=408, message=error_msg)
try: try:
for idx, prompt in enumerate(request_prompts): for idx, prompt in enumerate(request_prompts):
@@ -121,6 +129,8 @@ class OpenAIServingCompletion:
text_after_process_list.append(current_req_dict.get("text_after_process")) text_after_process_list.append(current_req_dict.get("text_after_process"))
prompt_batched_token_ids.append(prompt_token_ids) prompt_batched_token_ids.append(prompt_token_ids)
except Exception as e: except Exception as e:
error_msg = f"OpenAIServingCompletion format error: {e}, {str(traceback.format_exc())}"
api_server_logger.error(error_msg)
return ErrorResponse(message=str(e), code=400) return ErrorResponse(message=str(e), code=400)
del current_req_dict del current_req_dict
@@ -147,10 +157,16 @@ class OpenAIServingCompletion:
text_after_process_list=text_after_process_list, text_after_process_list=text_after_process_list,
) )
except Exception as e: except Exception as e:
return ErrorResponse(code=400, message=str(e)) error_msg = (
f"OpenAIServingCompletion completion_full_generator error: {e}, {str(traceback.format_exc())}"
)
api_server_logger.error(error_msg)
return ErrorResponse(code=400, message=error_msg)
except Exception as e: except Exception as e:
return ErrorResponse(message=str(e), code=400) error_msg = f"OpenAIServingCompletion create_completion error: {e}, {str(traceback.format_exc())}"
api_server_logger.error(error_msg)
return ErrorResponse(message=error_msg, code=400)
async def completion_full_generator( async def completion_full_generator(
self, self,
@@ -431,6 +447,7 @@ class OpenAIServingCompletion:
choices = [] choices = []
except Exception as e: except Exception as e:
api_server_logger.error(f"Error in completion_stream_generator: {e}, {str(traceback.format_exc())}")
yield f"data: {ErrorResponse(message=str(e), code=400).model_dump_json(exclude_unset=True)}\n\n" yield f"data: {ErrorResponse(message=str(e), code=400).model_dump_json(exclude_unset=True)}\n\n"
finally: finally:
del request del request
@@ -614,5 +631,5 @@ class OpenAIServingCompletion:
) )
except Exception as e: except Exception as e:
api_server_logger.error("Error in _build_logprobs_response: %s", e) api_server_logger.error(f"Error in _build_logprobs_response: {str(e)}, {str(traceback.format_exc())}")
return None return None

View File

@@ -14,6 +14,7 @@
import json import json
import re import re
import traceback
import uuid import uuid
from collections.abc import Sequence from collections.abc import Sequence
from typing import Union from typing import Union
@@ -162,10 +163,12 @@ class ErnieX1ToolParser(ToolParser):
} }
) )
except Exception as e: except Exception as e:
data_processor_logger.debug(f"Failed to parse tool call: {str(e)}") data_processor_logger.error(
f"Failed to parse tool call: {str(e)}, {str(traceback.format_exc())}"
)
continue continue
except Exception as e: except Exception as e:
data_processor_logger.debug(f"Failed to parse tool call: {str(e)}") data_processor_logger.error(f"Failed to parse tool call: {str(e)}, {str(traceback.format_exc())}")
continue continue
if not function_call_arr: if not function_call_arr:
@@ -211,7 +214,9 @@ class ErnieX1ToolParser(ToolParser):
) )
except Exception as e: except Exception as e:
data_processor_logger.error(f"Error in extracting tool call from response: {str(e)}") data_processor_logger.error(
f"Error in extracting tool call from response: {str(e)}, {str(traceback.format_exc())}"
)
return ExtractedToolCallInformation(tools_called=False, tool_calls=None, content=model_output) return ExtractedToolCallInformation(tools_called=False, tool_calls=None, content=model_output)
def extract_tool_calls_streaming( def extract_tool_calls_streaming(
@@ -302,7 +307,9 @@ class ErnieX1ToolParser(ToolParser):
self.streamed_args_for_tool[self.current_tool_id] = args_json self.streamed_args_for_tool[self.current_tool_id] = args_json
return delta return delta
except Exception as e: except Exception as e:
data_processor_logger.debug(f"Partial arguments parsing: {str(e)}") data_processor_logger.error(
f"Partial arguments parsing: {str(e)}, {str(traceback.format_exc())}"
)
if "</tool_call>" in self.buffer: if "</tool_call>" in self.buffer:
end_pos = self.buffer.find("</tool_call>") end_pos = self.buffer.find("</tool_call>")
@@ -316,5 +323,7 @@ class ErnieX1ToolParser(ToolParser):
return delta return delta
except Exception as e: except Exception as e:
data_processor_logger.error(f"Error in streaming tool call extraction: {str(e)}") data_processor_logger.error(
f"Error in streaming tool call extraction: {str(e)}, {str(traceback.format_exc())}"
)
return None return None

View File

@@ -14,6 +14,8 @@
# limitations under the License. # limitations under the License.
""" """
import traceback
import numpy as np import numpy as np
from paddleformers.generation import GenerationConfig from paddleformers.generation import GenerationConfig
@@ -152,7 +154,7 @@ class ErnieMoEVLProcessor(ErnieProcessor):
return kwargs return kwargs
except Exception as e: except Exception as e:
data_processor_logger.warning(f"Invalid mm-processor-kwargs format: {e}") data_processor_logger.warning(f"Invalid mm-processor-kwargs format: {e}, {str(traceback.format_exc())}")
return {} return {}
def _parse_limits(self, limits): def _parse_limits(self, limits):

View File

@@ -16,6 +16,7 @@
import threading import threading
import time import time
import traceback
from multiprocessing.managers import ( from multiprocessing.managers import (
AcquirerProxy, AcquirerProxy,
BaseManager, BaseManager,
@@ -275,5 +276,5 @@ class EngineCacheQueue:
try: try:
return len(self.transfer_task_queue) == 0 return len(self.transfer_task_queue) == 0
except Exception as e: except Exception as e:
logger.error(f"empty function meets error: {e}") logger.error(f"empty function meets error: {e}, {str(traceback.format_exc())}")
raise e raise e

View File

@@ -17,6 +17,7 @@
import os import os
import threading import threading
import time import time
import traceback
import msgpack import msgpack
import zmq import zmq
@@ -143,7 +144,7 @@ class ZmqClient:
llm_logger.error(f"[{req_id}] zmq error: {e}") llm_logger.error(f"[{req_id}] zmq error: {e}")
self.req_dict[req_id] = -1 self.req_dict[req_id] = -1
except Exception as e: except Exception as e:
llm_logger.error(f"Send result to zmq client failed: {e}") llm_logger.error(f"Send result to zmq client failed: {e}, {str(traceback.format_exc())}")
if data[-1].finished: if data[-1].finished:
with self.mutex: with self.mutex:
@@ -163,7 +164,7 @@ class ZmqClient:
return None, None return None, None
except Exception as e: except Exception as e:
self.close() self.close()
llm_logger.warning(f"{e}") llm_logger.warning(f"{e}, {str(traceback.format_exc())}")
return str(e), None return str(e), None
def receive_pyobj_once(self, block=False): def receive_pyobj_once(self, block=False):
@@ -179,7 +180,7 @@ class ZmqClient:
return None, None return None, None
except Exception as e: except Exception as e:
self.close() self.close()
llm_logger.warning(f"{e}") llm_logger.warning(f"{e}, {str(traceback.format_exc())}")
return str(e), None return str(e), None
def _clear_ipc(self, name): def _clear_ipc(self, name):
@@ -214,7 +215,7 @@ class ZmqClient:
self._clear_ipc(self.file_name) self._clear_ipc(self.file_name)
self._clear_ipc(self.router_path) self._clear_ipc(self.router_path)
except Exception as e: except Exception as e:
llm_logger.warning(f"Failed to close ZMQ connection - {e}") llm_logger.warning(f"Failed to close ZMQ connection - {e}, {str(traceback.format_exc())}")
return return
def __exit__(self, exc_type, exc_val, exc_tb): def __exit__(self, exc_type, exc_val, exc_tb):

View File

@@ -15,6 +15,7 @@
""" """
import os import os
import traceback
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from fastdeploy.config import ErnieArchitectures, FDConfig from fastdeploy.config import ErnieArchitectures, FDConfig
@@ -300,7 +301,7 @@ class BackendBase:
return tokenizer return tokenizer
except Exception as e: except Exception as e:
raise Exception(f"Fail to initialize hf tokenizer: {e}") raise Exception(f"Fail to initialize hf tokenizer: {e}, {str(traceback.format_exc())}")
def add_cache(self, schemata_key: tuple[str, str], processor: LogitsProcessorBase) -> None: def add_cache(self, schemata_key: tuple[str, str], processor: LogitsProcessorBase) -> None:
""" """

View File

@@ -16,6 +16,7 @@
import json import json
import re import re
import traceback
from typing import Any, List, Optional from typing import Any, List, Optional
import paddle import paddle
@@ -263,7 +264,7 @@ class XGrammarBackend(BackendBase):
try: try:
compiled_grammar = self.grammar_compiler.compile_json_schema(schemata, any_whitespace=self.any_whitespace) compiled_grammar = self.grammar_compiler.compile_json_schema(schemata, any_whitespace=self.any_whitespace)
except Exception as e: except Exception as e:
llm_logger.error(f"Failed to compile json schema: {e}") llm_logger.error(f"Failed to compile json schema: {e}, {str(traceback.format_exc())}")
return None return None
return self._create_processor(compiled_grammar) return self._create_processor(compiled_grammar)
@@ -280,7 +281,7 @@ class XGrammarBackend(BackendBase):
try: try:
compiled_grammar = self.grammar_compiler.compile_regex(schemata) compiled_grammar = self.grammar_compiler.compile_regex(schemata)
except Exception as e: except Exception as e:
llm_logger.error(f"Failed to compile regex schema: {e}") llm_logger.error(f"Failed to compile regex schema: {e}, {str(traceback.format_exc())}")
return None return None
return self._create_processor(compiled_grammar) return self._create_processor(compiled_grammar)
@@ -297,7 +298,7 @@ class XGrammarBackend(BackendBase):
try: try:
compiled_grammar = self.grammar_compiler.compile_grammar(schemata) compiled_grammar = self.grammar_compiler.compile_grammar(schemata)
except Exception as e: except Exception as e:
llm_logger.error(f"Failed to compile ebnf schema: {e}") llm_logger.error(f"Failed to compile ebnf schema: {e}, {str(traceback.format_exc())}")
return None return None
return self._create_processor(compiled_grammar) return self._create_processor(compiled_grammar)
@@ -324,7 +325,7 @@ class XGrammarBackend(BackendBase):
compiled_grammar = self.grammar_compiler.compile_structural_tag(tags, structural_tag["triggers"]) compiled_grammar = self.grammar_compiler.compile_structural_tag(tags, structural_tag["triggers"])
except Exception as e: except Exception as e:
llm_logger.error(f"Failed to compile structural tags schema: {e}") llm_logger.error(f"Failed to compile structural tags schema: {e}, {str(traceback.format_exc())}")
return None return None
return self._create_processor(compiled_grammar) return self._create_processor(compiled_grammar)

View File

@@ -201,7 +201,7 @@ class TokenProcessor:
self.prefill_time_signal.value[current_index] = 0 self.prefill_time_signal.value[current_index] = 0
current_index += 1 current_index += 1
except Exception as e: except Exception as e:
llm_logger.error(f"Error processing prefill metrics: {e}") llm_logger.error(f"Error processing prefill metrics: {e}, {str(traceback.format_exc())}")
self.executor.submit(process_metrics) self.executor.submit(process_metrics)
@@ -215,7 +215,7 @@ class TokenProcessor:
try: try:
self.cached_generated_tokens.put_results(batch_result) self.cached_generated_tokens.put_results(batch_result)
except Exception as e: except Exception as e:
llm_logger.error(f"Error in TokenProcessor's postprocess: {e}") llm_logger.error(f"Error in TokenProcessor's postprocess: {e}, {str(traceback.format_exc())}")
def _recycle_resources(self, task_id, index, task, result=None, is_prefill=False): def _recycle_resources(self, task_id, index, task, result=None, is_prefill=False):
""" """

View File

@@ -14,6 +14,8 @@
# limitations under the License. # limitations under the License.
""" """
import traceback
import paddle import paddle
from fastdeploy.utils import console_logger as logger from fastdeploy.utils import console_logger as logger
@@ -40,7 +42,8 @@ class CUDAPlatform(Platform):
logger.warning( logger.warning(
"You are using GPU version PaddlePaddle, but there is no GPU " "You are using GPU version PaddlePaddle, but there is no GPU "
"detected on your machine. Maybe CUDA devices is not set properly." "detected on your machine. Maybe CUDA devices is not set properly."
f"\n Original Error is {e}" f"\n Original Error is {e}, "
f"{str(traceback.format_exc())}"
) )
return False return False

View File

@@ -14,6 +14,8 @@
""" """
dcu platform file dcu platform file
""" """
import traceback
import paddle import paddle
from paddleformers.utils.log import logger from paddleformers.utils.log import logger
@@ -39,7 +41,8 @@ class DCUPlatform(Platform):
logger.warning( logger.warning(
"You are using GPU version PaddlePaddle, but there is no GPU " "You are using GPU version PaddlePaddle, but there is no GPU "
"detected on your machine. Maybe CUDA devices is not set properly." "detected on your machine. Maybe CUDA devices is not set properly."
f"\n Original Error is {e}" f"\n Original Error is {e}, "
f"{str(traceback.format_exc())}"
) )
return False return False

View File

@@ -14,6 +14,8 @@
# limitations under the License. # limitations under the License.
""" """
import traceback
import paddle import paddle
from fastdeploy.utils import console_logger as logger from fastdeploy.utils import console_logger as logger
@@ -40,7 +42,8 @@ class GCUPlatform(Platform):
logger.warning( logger.warning(
"You are using GCUPlatform, but there is no GCU " "You are using GCUPlatform, but there is no GCU "
"detected on your machine. Maybe GCU devices is not set properly." "detected on your machine. Maybe GCU devices is not set properly."
f"\n Original Error is {e}" f"\n Original Error is {e}, "
f"{str(traceback.format_exc())}"
) )
return False return False

View File

@@ -17,6 +17,7 @@
""" """
maca platform file maca platform file
""" """
import traceback
import paddle import paddle
from paddleformers.utils.log import logger from paddleformers.utils.log import logger
@@ -43,7 +44,8 @@ class MACAPlatform(Platform):
logger.warning( logger.warning(
"You are using GPU version PaddlePaddle, but there is no GPU " "You are using GPU version PaddlePaddle, but there is no GPU "
"detected on your machine. Maybe CUDA devices is not set properly." "detected on your machine. Maybe CUDA devices is not set properly."
f"\n Original Error is {e}" f"\n Original Error is {e}, "
f"{str(traceback.format_exc())}"
) )
return False return False

View File

@@ -11,6 +11,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import traceback
import paddle import paddle
from fastdeploy.utils import console_logger as logger from fastdeploy.utils import console_logger as logger
@@ -38,7 +40,8 @@ class XPUPlatform(Platform):
logger.warning( logger.warning(
"You are using XPU version PaddlePaddle, but there is no XPU " "You are using XPU version PaddlePaddle, but there is no XPU "
"detected on your machine. Maybe CUDA devices is not set properly." "detected on your machine. Maybe CUDA devices is not set properly."
f"\n Original Error is {e}" f"\n Original Error is {e}, "
f"{str(traceback.format_exc())}"
) )
return False return False

View File

@@ -237,7 +237,7 @@ class GlobalScheduler:
) )
time.sleep(self.keep_alive_duration / 2) time.sleep(self.keep_alive_duration / 2)
except Exception as e: except Exception as e:
scheduler_logger.error(f"Scheduler keep alive failed: {e}") scheduler_logger.error(f"Scheduler keep alive failed: {e}, {str(traceback.format_exc())}")
time.sleep(min(3, self.keep_alive_duration / 4)) time.sleep(min(3, self.keep_alive_duration / 4))
def _scheduler_name_from_request_queue(self, request_queue: str) -> str: def _scheduler_name_from_request_queue(self, request_queue: str) -> str:

View File

@@ -20,6 +20,7 @@ import math
import random import random
import threading import threading
import time import time
import traceback
from collections import deque from collections import deque
from typing import List from typing import List
@@ -379,7 +380,7 @@ class ResultReader:
if total == 0: if total == 0:
time.sleep(0.01) time.sleep(0.01)
except Exception as e: except Exception as e:
logger.error(f"ResultsReader{self.idx} sync results error: {e!s}") logger.error(f"ResultsReader{self.idx} sync results error: {e!s}, {str(traceback.format_exc())}")
def sync_results(self, keys): def sync_results(self, keys):
""" """
@@ -402,7 +403,7 @@ class ResultReader:
result = RequestOutput.from_dict(data) result = RequestOutput.from_dict(data)
self.data.appendleft(result) self.data.appendleft(result)
except Exception as e: except Exception as e:
logger.error(f"Parse Result Error:{e}, {result}") logger.error(f"Parse Result Error:{e}, {str(traceback.format_exc())}, {result}")
return total return total
@@ -498,7 +499,7 @@ class APIScheduler:
except IndexError: except IndexError:
continue continue
except Exception as e: except Exception as e:
logger.error(f"APIScheduler Schedule req error: {e!s}") logger.error(f"APIScheduler Schedule req error: {e!s}, {str(traceback.format_exc())}")
def schedule(self, req, pnodes, dnodes, mnodes, group=""): def schedule(self, req, pnodes, dnodes, mnodes, group=""):
""" """
@@ -573,8 +574,8 @@ class APIScheduler:
# logger.info(f"clear expired nodes: {nodeid}") # logger.info(f"clear expired nodes: {nodeid}")
self.client.hdel(self.cluster_key, nodeid) self.client.hdel(self.cluster_key, nodeid)
time.sleep(self.clear_expired_nodes_period) time.sleep(self.clear_expired_nodes_period)
except Exception: except Exception as e:
logger.error("APIScheduler clear expired nodes error: {str(e)}") logger.error(f"APIScheduler clear expired nodes error: {str(e)}, {str(traceback.format_exc())}")
def select_pd(self, req, nodes, role): def select_pd(self, req, nodes, role):
""" """
@@ -664,7 +665,7 @@ class ResultWriter:
# e = time.time() # e = time.time()
# logger.info(f"Lpush {self.idx}: {key} used {e-s} {len(items)} items") # logger.info(f"Lpush {self.idx}: {key} used {e-s} {len(items)} items")
except Exception as e: except Exception as e:
logger.error(f"ResultWriter write error: {e!s}") logger.error(f"ResultWriter write error: {e!s}, {str(traceback.format_exc())}")
class InferScheduler: class InferScheduler:
@@ -723,7 +724,7 @@ class InferScheduler:
self.client.hset(self.cluster_key, self.nodeid, info) self.client.hset(self.cluster_key, self.nodeid, info)
time.sleep(self.sync_period / 1000.0) time.sleep(self.sync_period / 1000.0)
except Exception as e: except Exception as e:
logger.error(f"InferScheduler routine report error: {e!s}") logger.error(f"InferScheduler routine report error: {e!s}, {str(traceback.format_exc())}")
def loop_expire_reqs(self): def loop_expire_reqs(self):
""" """
@@ -733,8 +734,8 @@ class InferScheduler:
try: try:
self.node.expire_reqs(self.release_load_expire_period) self.node.expire_reqs(self.release_load_expire_period)
time.sleep(60) time.sleep(60)
except Exception: except Exception as e:
logger.error("InferScheduler expire reqs error: {e}") logger.error(f"InferScheduler expire reqs error: {e}, {str(traceback.format_exc())}")
def loop_get_reqs(self): def loop_get_reqs(self):
""" """
@@ -772,7 +773,7 @@ class InferScheduler:
else: else:
self.node.add_req(req.request_id, 1) self.node.add_req(req.request_id, 1)
except Exception as e: except Exception as e:
logger.error(f"InferScheduler loop get reqs error: {e!s}") logger.error(f"InferScheduler loop get reqs error: {e!s}, {str(traceback.format_exc())}")
def get_requests( def get_requests(
self, self,
@@ -807,7 +808,8 @@ class InferScheduler:
return reqs return reqs
# logger.info(f"Get Requests from Scheduler: {req.request_id}") # logger.info(f"Get Requests from Scheduler: {req.request_id}")
reqs.append(req) reqs.append(req)
except Exception: except Exception as e:
logger.error(f"InferScheduler get requests error: {e}, {str(traceback.format_exc())}")
return reqs return reqs
return reqs return reqs

View File

@@ -16,6 +16,7 @@
import json import json
import time import time
import traceback
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from typing import Dict from typing import Dict
@@ -97,7 +98,7 @@ class SplitwiseConnector:
time.sleep(0.001) time.sleep(0.001)
except Exception as e: except Exception as e:
logger.error(f"Receiver error: {e}") logger.error(f"Receiver error: {e}, {str(traceback.format_exc())}")
time.sleep(1) time.sleep(1)
def _get_push_socket(self, addr): def _get_push_socket(self, addr):
@@ -152,7 +153,7 @@ class SplitwiseConnector:
except zmq.Again: except zmq.Again:
logger.warning(f"Send queue full for {addr}") logger.warning(f"Send queue full for {addr}")
except Exception as e: except Exception as e:
logger.error(f"Send to {addr} failed: {e}") logger.error(f"Send to {addr} failed: {e}, {str(traceback.format_exc())}")
self._close_connection(addr) self._close_connection(addr)
except Exception as e: except Exception as e:
@@ -433,7 +434,7 @@ class SplitwiseConnector:
self.engine_worker_queue.put_cache_info(payload) self.engine_worker_queue.put_cache_info(payload)
except Exception as e: except Exception as e:
logger.error(f"Message processing failed: {e}") logger.error(f"Message processing failed: {e}, {str(traceback.format_exc())}")
def _handle_prefill(self, tasks): def _handle_prefill(self, tasks):
""" """

View File

@@ -15,6 +15,7 @@
""" """
import os import os
import traceback
def check_safetensors_model(model_dir: str): def check_safetensors_model(model_dir: str):
@@ -45,5 +46,5 @@ def check_safetensors_model(model_dir: str):
sum(flags) == safetensors_num sum(flags) == safetensors_num
), f"Number of safetensor files should be {len(model_files)}, but now it's {sum(flags)}" ), f"Number of safetensor files should be {len(model_files)}, but now it's {sum(flags)}"
except Exception as e: except Exception as e:
raise Exception(f"Failed to check unified checkpoint, details: {e}.") raise Exception(f"Failed to check unified checkpoint, details: {e}, {str(traceback.format_exc())}.")
return is_safetensors return is_safetensors

View File

@@ -6,6 +6,7 @@
import os import os
import re import re
import traceback
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urlparse, urlunparse from urllib.parse import urlparse, urlunparse
@@ -122,7 +123,7 @@ def query_model(prompt):
) )
return response.choices[0].message.content.strip() return response.choices[0].message.content.strip()
except Exception as e: except Exception as e:
return f"[Error] {e}" return f"[Error] {e}, {str(traceback.format_exc())}"
# ========== 评估函数 ========== # ========== 评估函数 ==========

View File

@@ -7,6 +7,7 @@ import socket
import subprocess import subprocess
import sys import sys
import time import time
import traceback
import requests import requests
import yaml import yaml
@@ -175,7 +176,7 @@ def stop_server(signum=None, frame=None):
# 终止进程组(包括所有子进程) # 终止进程组(包括所有子进程)
os.killpg(os.getpgid(pid_port["PID"]), signal.SIGTERM) os.killpg(os.getpgid(pid_port["PID"]), signal.SIGTERM)
except Exception as e: except Exception as e:
print(f"Failed to stop server: {e}") print(f"Failed to stop server: {e}, {str(traceback.format_exc())}")
for port in [FD_API_PORT, FD_ENGINE_QUEUE_PORT, FD_METRICS_PORT]: for port in [FD_API_PORT, FD_ENGINE_QUEUE_PORT, FD_METRICS_PORT]:
try: try:
@@ -184,7 +185,7 @@ def stop_server(signum=None, frame=None):
os.kill(int(pid), signal.SIGKILL) os.kill(int(pid), signal.SIGKILL)
print(f"Killed process on port {port}, pid={pid}") print(f"Killed process on port {port}, pid={pid}")
except Exception as e: except Exception as e:
print(f"Failed to killed process on port: {e}") print(f"Failed to kill process on port: {e}, {str(traceback.format_exc())}")
# 若log目录存在则重命名为log_timestamp # 若log目录存在则重命名为log_timestamp
if os.path.isdir("./log"): if os.path.isdir("./log"):
os.rename("./log", "./log_{}".format(time.strftime("%Y%m%d%H%M%S"))) os.rename("./log", "./log_{}".format(time.strftime("%Y%m%d%H%M%S")))
@@ -229,8 +230,10 @@ def start_service():
# 构建命令 # 构建命令
cmd = build_command(final_config) cmd = build_command(final_config)
except Exception as e: except Exception as e:
error_msg = f"Failed to start service: {e}, {str(traceback.format_exc())}"
print(error_msg)
return Response( return Response(
json.dumps({"status": "error", "message": str(e)}, ensure_ascii=False), json.dumps({"status": "error", "message": error_msg}, ensure_ascii=False),
status=500, status=500,
content_type="application/json", content_type="application/json",
) )
@@ -264,8 +267,10 @@ def start_service():
return Response(json.dumps(json_data, ensure_ascii=False), status=200, content_type="application/json") return Response(json.dumps(json_data, ensure_ascii=False), status=200, content_type="application/json")
except Exception as e: except Exception as e:
error_msg = f"Failed to start service: {e}, {str(traceback.format_exc())}"
print(error_msg)
return Response( return Response(
json.dumps({"status": "error", "message": str(e)}, ensure_ascii=False), json.dumps({"status": "error", "message": error_msg}, ensure_ascii=False),
status=500, status=500,
content_type="application/json", content_type="application/json",
) )
@@ -295,8 +300,10 @@ def switch_service():
# 构建命令 # 构建命令
cmd = build_command(final_config) cmd = build_command(final_config)
except Exception as e: except Exception as e:
error_msg = f"Failed to switch service: {e}, {str(traceback.format_exc())}"
print(error_msg)
return Response( return Response(
json.dumps({"status": "error", "message": str(e)}, ensure_ascii=False), json.dumps({"status": "error", "message": error_msg}, ensure_ascii=False),
status=500, status=500,
content_type="application/json", content_type="application/json",
) )
@@ -330,8 +337,10 @@ def switch_service():
return Response(json.dumps(json_data, ensure_ascii=False), status=200, content_type="application/json") return Response(json.dumps(json_data, ensure_ascii=False), status=200, content_type="application/json")
except Exception as e: except Exception as e:
error_msg = f"Failed to switch service: {e}, {str(traceback.format_exc())}"
print(error_msg)
return Response( return Response(
json.dumps({"status": "error", "message": str(e)}, ensure_ascii=False), json.dumps({"status": "error", "message": error_msg}, ensure_ascii=False),
status=500, status=500,
content_type="application/json", content_type="application/json",
) )
@@ -406,8 +415,10 @@ def get_config():
) )
except Exception as e: except Exception as e:
error_msg = f"{e}, {str(traceback.format_exc())}"
print(error_msg)
return Response( return Response(
json.dumps({"message": "api_server.log解析失败请检查log", "error": str(e)}, ensure_ascii=False), json.dumps({"message": "api_server.log解析失败请检查log", "error": error_msg}, ensure_ascii=False),
status=500, status=500,
content_type="application/json", content_type="application/json",
) )
@@ -447,7 +458,7 @@ def wait_for_infer():
with open(path, "r", encoding="utf-8", errors="ignore") as f: with open(path, "r", encoding="utf-8", errors="ignore") as f:
return "".join(f.readlines()[-lines:]) return "".join(f.readlines()[-lines:])
except Exception as e: except Exception as e:
return f"[无法读取 {path}]: {e}\n" return f"[无法读取 {path}]: {e}, {str(traceback.format_exc())}\n"
result = f"服务启动超时,耗时:[{timeout}s]\n\n" result = f"服务启动超时,耗时:[{timeout}s]\n\n"
result += "==== server.log tail 50 ====\n" result += "==== server.log tail 50 ====\n"