add error log to file (#3431)
Some checks failed
Deploy GitHub Pages / deploy (push) Has been cancelled

* feat(log):add_request_and_response_log

* feat[log]:add error log to file
This commit is contained in:
xiaolei373
2025-08-20 09:52:34 +08:00
committed by GitHub
parent 3a6058e445
commit 5d131485d8
11 changed files with 1396 additions and 30 deletions

2
.gitignore vendored
View File

@@ -121,7 +121,7 @@ dmypy.json
FETCH_HEAD
#log
log*/
log/
checkpoints/
checkpoints_origin/

View File

View File

@@ -0,0 +1,55 @@
"""
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
"""
自定义日志格式化器模块
该模块定义了 ColoredFormatter 类,用于在控制台输出带颜色的日志信息,
便于开发者在终端中快速识别不同级别的日志。
"""
import logging
class ColoredFormatter(logging.Formatter):
"""
自定义日志格式器,用于控制台输出带颜色的日志。
支持的颜色:
- WARNING: 黄色
- ERROR: 红色
- CRITICAL: 红色
- 其他等级: 默认终端颜色
"""
COLOR_CODES = {
logging.WARNING: 33, # 黄色
logging.ERROR: 31, # 红色
logging.CRITICAL: 31, # 红色
}
def format(self, record):
"""
格式化日志记录,并根据日志等级添加 ANSI 颜色前缀和后缀。
Args:
record (LogRecord): 日志记录对象。
Returns:
str: 带有颜色的日志消息字符串。
"""
color_code = self.COLOR_CODES.get(record.levelno, 0)
prefix = f"\033[{color_code}m"
suffix = "\033[0m"
message = super().format(record)
if color_code:
message = f"{prefix}{message}{suffix}"
return message

View File

@@ -0,0 +1,367 @@
"""
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
import codecs
import logging
import os
import re
import time
from logging.handlers import BaseRotatingHandler
from pathlib import Path
"""自定义日志处理器模块:
该模块包含FastDeploy项目中使用的自定义日志处理器实现
用于处理和控制日志输出格式、级别和目标等。
"""
class IntervalRotatingFileHandler(BaseRotatingHandler):
"""
按天创建文件夹(YYYY-MM-DD)每n小时创建日志文件(prefix_YYYY-MM-DD-HH.log)
自动清理过期数据清理频率与interval同步支持多进程环境
"""
def __init__(
self,
filename,
backupDays=7,
interval=1,
encoding="utf-8",
delay=False,
utc=False,
**kwargs,
):
"""
初始化日志处理器
Args:
filename (str): 日志文件基础路径
backupDays (int): 保留天数默认7天
interval (int): 日志分割间隔小时数必须能被24整除默认1小时
encoding (str): 文件编码默认utf-8
delay (bool): 是否延迟打开文件默认False
utc (bool): 是否使用UTC时间默认False
"""
if 24 % interval != 0:
raise ValueError("interval必须能被24整除")
self.backup_days = backupDays
self.interval = interval
self.utc = utc
self.base_path = Path(filename)
self.current_day = self._get_current_day()
self.current_hour = self._get_current_hour()
self.current_dir = self._get_day_dir()
self.current_filename = self._get_hourly_filename()
self.current_filepath = self.current_dir / self.current_filename
self.last_clean_time = 0 # 初始化为0确保第一次会执行清理
self.seconds_per_hour = 3600
# 确保目录存在
self.current_dir.mkdir(parents=True, exist_ok=True)
BaseRotatingHandler.__init__(self, str(self.current_filepath), "a", encoding, delay)
def _get_current_time(self):
"""获取当前时间"""
return time.gmtime() if self.utc else time.localtime()
def _get_current_day(self):
"""获取当前日期字符串(YYYY-MM-DD)"""
return time.strftime("%Y-%m-%d", self._get_current_time())
def _get_current_hour(self):
"""获取当前小时数(0-23)"""
current_hour = self._get_current_time().tm_hour
return current_hour - (current_hour % self.interval)
def _get_day_dir(self):
"""获取当天目录路径"""
return self.base_path.parent / self.current_day
def _get_hourly_filename(self):
"""获取按小时分割的文件名"""
prefix = self.base_path.stem
hour_str = f"{self.current_hour:02d}"
return f"{prefix}_{self.current_day}-{hour_str}.log"
def shouldRollover(self, record):
"""检查是否需要滚动日志"""
now_day = self._get_current_day()
now_hour = self._get_current_hour()
# 检查日期或小时是否变化
if now_day != self.current_day or now_hour != self.current_hour:
return True
# 检查是否需要执行清理每个interval小时执行一次
current_time = time.time()
if current_time - self.last_clean_time > self.interval * self.seconds_per_hour:
return True
return False
def doRollover(self):
"""执行日志滚动和清理"""
if self.stream:
self.stream.close()
self.stream = None
# 更新当前日期和小时
self.current_day = self._get_current_day()
self.current_hour = self._get_current_hour()
self.current_dir = self._get_day_dir()
self.current_filename = self._get_hourly_filename()
self.current_filepath = self.current_dir / self.current_filename
# 创建新目录(如果不存在)
self.current_dir.mkdir(parents=True, exist_ok=True)
# 打开新日志文件
if not self.delay:
self.stream = self._open()
# 执行清理每个interval小时执行一次
current_time = time.time()
if current_time - self.last_clean_time > self.interval * self.seconds_per_hour:
self._clean_expired_data()
self.last_clean_time = current_time
def _open(self):
"""打开日志文件并创建符号链接"""
if self.encoding is None:
stream = open(str(self.current_filepath), self.mode)
else:
stream = codecs.open(str(self.current_filepath), self.mode, self.encoding)
# 创建符号链接(支持多进程)
self._create_symlink()
return stream
def _create_symlink(self):
"""创建指向当前日志文件的符号链接"""
symlink_path = self.base_path.parent / f"current_{self.base_path.stem}.log"
try:
if symlink_path.exists():
if symlink_path.is_symlink():
os.remove(str(symlink_path))
else:
# 不是符号链接则重命名避免冲突
backup_path = symlink_path.with_name(f"{symlink_path.stem}_backup.log")
os.rename(str(symlink_path), str(backup_path))
# 创建相对路径符号链接
rel_path = self.current_filepath.relative_to(self.base_path.parent)
os.symlink(str(rel_path), str(symlink_path))
except OSError:
# 多进程环境下可能发生竞争,忽略错误
pass
def _clean_expired_data(self):
"""清理过期数据"""
if self.backup_days <= 0:
return
cutoff_time = time.time() - (self.backup_days * 24 * self.seconds_per_hour)
day_pattern = re.compile(r"^\d{4}-\d{2}-\d{2}$")
file_pattern = re.compile(r"^.+_\d{4}-\d{2}-\d{2}-\d{2}\.log$")
# 清理过期日目录
for dir_name in os.listdir(str(self.base_path.parent)):
dir_path = self.base_path.parent / dir_name
if not dir_path.is_dir():
continue
if day_pattern.match(dir_name):
try:
dir_mtime = os.path.getmtime(str(dir_path))
if dir_mtime < cutoff_time:
# 删除整个过期目录
for file in dir_path.glob("*"):
try:
file.unlink()
except OSError:
pass
dir_path.rmdir()
except OSError:
pass
# 额外检查当前目录下的过期文件
for file_name in os.listdir(str(self.base_path.parent)):
file_path = self.base_path.parent / file_name
if file_path.is_file() and file_pattern.match(file_name):
try:
file_mtime = os.path.getmtime(str(file_path))
if file_mtime < cutoff_time:
file_path.unlink()
except OSError:
pass
class LazyFileHandler(logging.Handler):
"""
延迟创建日志文件的处理器,仅在首次写入日志时创建实际的文件处理器
"""
def __init__(self, filename, backupCount, level=logging.NOTSET, formatter=None):
super().__init__(level=level)
self.filename = filename
self.backupCount = backupCount
self.formatter = formatter
self._real_handler = None
def create_real_handler(self):
"""创建实际的文件处理器"""
handler = DailyRotatingFileHandler(self.filename, backupCount=self.backupCount)
handler.setLevel(self.level)
if self.formatter:
handler.setFormatter(self.formatter)
return handler
def emit(self, record):
# 检查日志级别
if record.levelno < self.level:
return
self.acquire()
try:
if self._real_handler is None:
self._real_handler = self.create_real_handler()
finally:
self.release()
# 将日志记录传递给实际处理器
self._real_handler.emit(record)
def close(self):
# 关闭实际处理器(如果存在)
if self._real_handler is not None:
self._real_handler.close()
super().close()
class DailyRotatingFileHandler(BaseRotatingHandler):
"""
like `logging.TimedRotatingFileHandler`, but this class support multi-process
"""
def __init__(
self,
filename,
backupCount=0,
encoding="utf-8",
delay=False,
utc=False,
**kwargs,
):
"""
初始化 RotatingFileHandler 对象。
Args:
filename (str): 日志文件的路径,可以是相对路径或绝对路径。
backupCount (int, optional, default=0): 保存的备份文件数量,默认为 0表示不保存备份文件。
encoding (str, optional, default='utf-8'): 编码格式,默认为 'utf-8'
delay (bool, optional, default=False): 是否延迟写入,默认为 False表示立即写入。
utc (bool, optional, default=False): 是否使用 UTC 时区,默认为 False表示不使用 UTC 时区。
kwargs (dict, optional): 其他参数将被传递给 BaseRotatingHandler 类的 init 方法。
Raises:
TypeError: 如果 filename 不是 str 类型。
ValueError: 如果 backupCount 小于等于 0。
"""
self.backup_count = backupCount
self.utc = utc
self.suffix = "%Y-%m-%d"
self.base_log_path = Path(filename)
self.base_filename = self.base_log_path.name
self.current_filename = self._compute_fn()
self.current_log_path = self.base_log_path.with_name(self.current_filename)
BaseRotatingHandler.__init__(self, filename, "a", encoding, delay)
def shouldRollover(self, record):
"""
check scroll through the log
"""
if self.current_filename != self._compute_fn():
return True
return False
def doRollover(self):
"""
scroll log
"""
if self.stream:
self.stream.close()
self.stream = None
self.current_filename = self._compute_fn()
self.current_log_path = self.base_log_path.with_name(self.current_filename)
if not self.delay:
self.stream = self._open()
self.delete_expired_files()
def _compute_fn(self):
"""
Calculate the log file name corresponding current time
"""
return self.base_filename + "." + time.strftime(self.suffix, time.localtime())
def _open(self):
"""
open new log file
"""
if self.encoding is None:
stream = open(str(self.current_log_path), self.mode)
else:
stream = codecs.open(str(self.current_log_path), self.mode, self.encoding)
if self.base_log_path.exists():
try:
if not self.base_log_path.is_symlink() or os.readlink(self.base_log_path) != self.current_filename:
os.remove(self.base_log_path)
except OSError:
pass
try:
os.symlink(self.current_filename, str(self.base_log_path))
except OSError:
pass
return stream
def delete_expired_files(self):
"""
delete expired log files
"""
if self.backup_count <= 0:
return
file_names = os.listdir(str(self.base_log_path.parent))
result = []
prefix = self.base_filename + "."
plen = len(prefix)
for file_name in file_names:
if file_name[:plen] == prefix:
suffix = file_name[plen:]
if re.match(r"^\d{4}-\d{2}-\d{2}(\.\w+)?$", suffix):
result.append(file_name)
if len(result) < self.backup_count:
result = []
else:
result.sort()
result = result[: len(result) - self.backup_count]
for file_name in result:
os.remove(str(self.base_log_path.with_name(file_name)))

161
fastdeploy/logger/logger.py Normal file
View File

@@ -0,0 +1,161 @@
"""
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
"""
日志模块:用于初始化和获取 FastDeploy 日志记录器。
本模块提供 get_logger 方法,统一管理各子模块的日志记录行为。
"""
import logging
import os
import threading
from pathlib import Path
from fastdeploy import envs
from fastdeploy.logger.formatters import ColoredFormatter
from fastdeploy.logger.handlers import DailyRotatingFileHandler, LazyFileHandler
from fastdeploy.logger.setup_logging import setup_logging
class FastDeployLogger:
_instance = None
_initialized = False
_lock = threading.RLock()
def __new__(cls):
"""单例模式实现"""
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def _initialize(self):
"""显式初始化日志系统"""
with self._lock:
if not self._initialized:
setup_logging()
self._initialized = True
def get_logger(self, name, file_name=None, without_formater=False, print_to_console=False):
"""
获取日志记录器(兼容原有接口)
Args:
name: 日志器名称
file_name: 日志文件名(保持兼容性)
without_formater: 是否不使用格式化器
print_to_console: 是否打印到控制台
"""
# 如果只有一个参数,使用新的统一命名方式
if file_name is None and not without_formater and not print_to_console:
# 延迟初始化
if not self._initialized:
self._initialize()
return self._get_unified_logger(name)
# 兼容原有接口
return self._get_legacy_logger(name, file_name, without_formater, print_to_console)
def _get_unified_logger(self, name):
"""
新的统一日志获取方式
"""
if name is None:
return logging.getLogger("fastdeploy")
# 处理 __main__ 特殊情况
if name == "__main__":
import __main__
# 获取主模块的 __file__ 属性
if hasattr(__main__, "__file__"):
# 获取主模块的文件名
base_name = Path(__main__.__file__).stem
# 创建带前缀的日志器
return logging.getLogger(f"fastdeploy.main.{base_name}")
return logging.getLogger("fastdeploy.main")
# 如果已经是fastdeploy命名空间直接使用
if name.startswith("fastdeploy.") or name == "fastdeploy":
return logging.getLogger(name)
else:
# 其他情况添加fastdeploy前缀
return logging.getLogger(f"fastdeploy.{name}")
def _get_legacy_logger(self, name, file_name, without_formater=False, print_to_console=False):
"""
兼容原有接口的日志获取方式
"""
log_dir = envs.FD_LOG_DIR
if not os.path.exists(log_dir):
os.makedirs(log_dir, exist_ok=True)
is_debug = int(envs.FD_DEBUG)
# logger = logging.getLogger(name)
# 为了兼容原有接口使用命名空间进行隔离避免logger覆盖、混乱等问题
legacy_name = f"legacy.{name}"
logger = logging.getLogger(legacy_name)
# 设置日志级别
if is_debug:
logger.setLevel(level=logging.DEBUG)
else:
logger.setLevel(level=logging.INFO)
# 设置格式化器
formatter = ColoredFormatter(
"%(levelname)-8s %(asctime)s %(process)-5s %(filename)s[line:%(lineno)d] %(message)s"
)
# 清除现有的handlers保持原有逻辑
for handler in logger.handlers[:]:
logger.removeHandler(handler)
# 创建主日志文件handler
LOG_FILE = f"{log_dir}/{file_name}"
backup_count = int(envs.FD_LOG_BACKUP_COUNT)
# handler = LazyFileHandler(filename=LOG_FILE, backupCount=backup_count, level=hanlder_level)
handler = DailyRotatingFileHandler(LOG_FILE, backupCount=backup_count)
# 创建ERROR日志文件handler新增功能
if not file_name.endswith(".log"):
file_name = f"{file_name}.log" if "." not in file_name else file_name.split(".")[0] + ".log"
ERROR_LOG_FILE = os.path.join(log_dir, file_name.replace(".log", "_error.log"))
error_handler = LazyFileHandler(
filename=ERROR_LOG_FILE, backupCount=backup_count, level=logging.ERROR, formatter=None
)
if not without_formater:
handler.setFormatter(formatter)
error_handler.setFormatter(formatter)
# 添加文件handlers
logger.addHandler(handler)
logger.addHandler(error_handler)
# 控制台handler
if print_to_console:
console_handler = logging.StreamHandler()
if not without_formater:
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
console_handler.propagate = False
# 设置propagate保持原有逻辑
# logger.propagate = False
return logger

View File

@@ -0,0 +1,153 @@
"""
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
"""
配置日志系统
"""
import json
import logging
import logging.config
import os
from pathlib import Path
from fastdeploy import envs
def setup_logging(log_dir=None, config_file=None):
"""
设置FastDeploy的日志配置
Args:
log_dir: 日志文件存储目录,如果不提供则使用环境变量
config_file: JSON配置文件路径如果不提供则使用默认配置
"""
# 避免重复配置
if getattr(setup_logging, "_configured", False):
return logging.getLogger("fastdeploy")
# 使用环境变量中的日志目录,如果没有则使用传入的参数或默认值
if log_dir is None:
log_dir = getattr(envs, "FD_LOG_DIR", "logs")
# 确保日志目录存在
Path(log_dir).mkdir(parents=True, exist_ok=True)
# 从环境变量获取日志级别和备份数量
is_debug = int(getattr(envs, "FD_DEBUG", 0))
FASTDEPLOY_LOGGING_LEVEL = "DEBUG" if is_debug else "INFO"
backup_count = int(getattr(envs, "FD_LOG_BACKUP_COUNT", 7))
# 定义日志输出格式
_FORMAT = "%(levelname)-8s %(asctime)s %(process)-5s %(filename)s[line:%(lineno)d] %(message)s"
# 默认配置
default_config = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"standard": {
"class": "logging.Formatter",
"format": _FORMAT,
"datefmt": "%Y-%m-%d %H:%M:%S",
},
"colored": {
"class": "fastdeploy.logger.formatters.ColoredFormatter",
"format": _FORMAT,
"datefmt": "%Y-%m-%d %H:%M:%S",
},
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"level": FASTDEPLOY_LOGGING_LEVEL,
"formatter": "colored",
"stream": "ext://sys.stdout",
},
# 默认错误日志保留最新1个小时的日志位置在log/error.log
"error_file": {
"class": "logging.handlers.TimedRotatingFileHandler",
"level": "ERROR",
"formatter": "standard",
"filename": os.path.join(log_dir, "error.log"),
"when": "H",
"interval": 1,
"backupCount": 1,
},
# 全量日志保留最新1小时的日志位置在log/default.log
"default_file": {
"class": "logging.handlers.TimedRotatingFileHandler",
"level": FASTDEPLOY_LOGGING_LEVEL,
"formatter": "standard",
"filename": os.path.join(log_dir, "default.log"),
"when": "H",
"interval": 1,
"backupCount": 1,
},
# 错误日志归档保留7天内的日志每隔1小时一个文件形式如FastDeploy/log/2025-08-14/error_2025-08-14-18.log
"error_archive": {
"class": "fastdeploy.logger.handlers.IntervalRotatingFileHandler",
"level": "ERROR",
"formatter": "standard",
"filename": os.path.join(log_dir, "error.log"),
"backupDays": 7,
"interval": 1,
"encoding": "utf-8",
},
# 全量日志归档保留7天内的日志每隔1小时一个文件形式如FastDeploy/log/2025-08-14/default_2025-08-14-18.log
"default_archive": {
"class": "fastdeploy.logger.handlers.IntervalRotatingFileHandler",
"level": FASTDEPLOY_LOGGING_LEVEL,
"formatter": "standard",
"filename": os.path.join(log_dir, "default.log"),
"backupDays": 7,
"interval": 1,
"encoding": "utf-8",
},
},
"loggers": {
# 默认日志记录器,全局共享
"fastdeploy": {
"level": "DEBUG",
"handlers": ["error_file", "default_file", "error_archive", "default_archive"],
"propagate": False,
}
},
}
# 如果提供了配置文件,则加载配置文件
if config_file and os.path.exists(config_file):
with open(config_file, "r", encoding="utf-8") as f:
config = json.load(f)
# 合并环境变量配置到用户配置中,环境变量的优先级高于自定义的优先级
if "handlers" in config:
for handler_name, handler_config in config["handlers"].items():
if "backupCount" not in handler_config and "DailyRotating" in handler_config.get("class", ""):
handler_config["backupCount"] = backup_count
if handler_config.get("level") == "INFO" and is_debug:
handler_config["level"] = "DEBUG"
else:
config = default_config
# 应用日志配置
logging.config.dictConfig(config)
# 避免重复加载
setup_logging._configured = True
# 返回fastdeploy的logger
return logging.getLogger("fastdeploy")

View File

@@ -40,6 +40,7 @@ from tqdm import tqdm
from typing_extensions import TypeIs, assert_never
from fastdeploy import envs
from fastdeploy.logger.logger import FastDeployLogger
T = TypeVar("T")
@@ -191,38 +192,38 @@ class DailyRotatingFileHandler(BaseRotatingHandler):
os.remove(str(self.base_log_path.with_name(file_name)))
def get_logger(name, file_name, without_formater=False, print_to_console=False):
"""
get logger
"""
log_dir = envs.FD_LOG_DIR
if not os.path.exists(log_dir):
os.mkdir(log_dir)
is_debug = int(envs.FD_DEBUG)
logger = logging.getLogger(name)
if is_debug:
logger.setLevel(level=logging.DEBUG)
else:
logger.setLevel(level=logging.INFO)
# def get_logger(name, file_name, without_formater=False, print_to_console=False):
# """
# get logger
# """
# log_dir = envs.FD_LOG_DIR
# if not os.path.exists(log_dir):
# os.mkdir(log_dir)
# is_debug = int(envs.FD_DEBUG)
# logger = logging.getLogger(name)
# if is_debug:
# logger.setLevel(level=logging.DEBUG)
# else:
# logger.setLevel(level=logging.INFO)
for handler in logger.handlers[:]:
logger.removeHandler(handler)
# for handler in logger.handlers[:]:
# logger.removeHandler(handler)
LOG_FILE = f"{log_dir}/{file_name}"
backup_count = int(envs.FD_LOG_BACKUP_COUNT)
handler = DailyRotatingFileHandler(LOG_FILE, backupCount=backup_count)
formatter = ColoredFormatter("%(levelname)-8s %(asctime)s %(process)-5s %(filename)s[line:%(lineno)d] %(message)s")
# LOG_FILE = f"{log_dir}/{file_name}"
# backup_count = int(envs.FD_LOG_BACKUP_COUNT)
# handler = DailyRotatingFileHandler(LOG_FILE, backupCount=backup_count)
# formatter = ColoredFormatter("%(levelname)-8s %(asctime)s %(process)-5s %(filename)s[line:%(lineno)d] %(message)s")
console_handler = logging.StreamHandler()
if not without_formater:
handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
logger.addHandler(handler)
if print_to_console:
logger.addHandler(console_handler)
handler.propagate = False
console_handler.propagate = False
return logger
# console_handler = logging.StreamHandler()
# if not without_formater:
# handler.setFormatter(formatter)
# console_handler.setFormatter(formatter)
# logger.addHandler(handler)
# if print_to_console:
# logger.addHandler(console_handler)
# handler.propagate = False
# console_handler.propagate = False
# return logger
def str_to_datetime(date_string):
@@ -736,6 +737,12 @@ class StatefulSemaphore:
}
# 日志使用全局访问点(兼容原有使用方式)
def get_logger(name, file_name=None, without_formater=False, print_to_console=False):
"""全局函数包装器,保持向后兼容"""
return FastDeployLogger().get_logger(name, file_name, without_formater, print_to_console)
llm_logger = get_logger("fastdeploy", "fastdeploy.log")
data_processor_logger = get_logger("data_processor", "data_processor.log")
scheduler_logger = get_logger("scheduler", "scheduler.log")

View File

@@ -0,0 +1,103 @@
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import unittest
from fastdeploy.logger.formatters import ColoredFormatter
class TestColoredFormatter(unittest.TestCase):
"""测试 ColoredFormatter 类"""
def setUp(self):
"""测试前准备"""
self.formatter = ColoredFormatter("%(levelname)s - %(message)s")
def test_color_codes_definition(self):
"""测试颜色代码定义"""
expected_colors = {
logging.WARNING: 33, # 黄色
logging.ERROR: 31, # 红色
logging.CRITICAL: 31, # 红色
}
self.assertEqual(self.formatter.COLOR_CODES, expected_colors)
def test_format_warning_message(self):
"""测试 WARNING 级别日志格式化(黄色)"""
record = logging.LogRecord(
name="test", level=logging.WARNING, pathname="", lineno=0, msg="This is a warning", args=(), exc_info=None
)
formatted_message = self.formatter.format(record)
expected = "\033[33mWARNING - This is a warning\033[0m"
self.assertEqual(formatted_message, expected)
def test_format_error_message(self):
"""测试 ERROR 级别日志格式化(红色)"""
record = logging.LogRecord(
name="test", level=logging.ERROR, pathname="", lineno=0, msg="This is an error", args=(), exc_info=None
)
formatted_message = self.formatter.format(record)
expected = "\033[31mERROR - This is an error\033[0m"
self.assertEqual(formatted_message, expected)
def test_format_critical_message(self):
"""测试 CRITICAL 级别日志格式化(红色)"""
record = logging.LogRecord(
name="test", level=logging.CRITICAL, pathname="", lineno=0, msg="This is critical", args=(), exc_info=None
)
formatted_message = self.formatter.format(record)
expected = "\033[31mCRITICAL - This is critical\033[0m"
self.assertEqual(formatted_message, expected)
def test_format_info_message(self):
"""测试 INFO 级别日志格式化(无颜色)"""
record = logging.LogRecord(
name="test", level=logging.INFO, pathname="", lineno=0, msg="This is info", args=(), exc_info=None
)
formatted_message = self.formatter.format(record)
expected = "INFO - This is info"
self.assertEqual(formatted_message, expected)
def test_format_debug_message(self):
"""测试 DEBUG 级别日志格式化(无颜色)"""
record = logging.LogRecord(
name="test", level=logging.DEBUG, pathname="", lineno=0, msg="This is debug", args=(), exc_info=None
)
formatted_message = self.formatter.format(record)
expected = "DEBUG - This is debug"
self.assertEqual(formatted_message, expected)
def test_format_custom_level(self):
"""测试自定义级别日志格式化(无颜色)"""
# 创建自定义级别
custom_level = 25 # 介于 INFO(20) 和 WARNING(30) 之间
record = logging.LogRecord(
name="test", level=custom_level, pathname="", lineno=0, msg="This is custom level", args=(), exc_info=None
)
record.levelname = "CUSTOM"
formatted_message = self.formatter.format(record)
expected = "CUSTOM - This is custom level"
self.assertEqual(formatted_message, expected)
if __name__ == "__main__":
unittest.main(verbosity=2)

View File

@@ -0,0 +1,305 @@
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import shutil
import tempfile
import time
import unittest
from datetime import datetime, timedelta
from logging import INFO, LogRecord, getLogger
from pathlib import Path
from unittest.mock import MagicMock, patch
from fastdeploy.logger.handlers import (
DailyRotatingFileHandler,
IntervalRotatingFileHandler,
LazyFileHandler,
)
class TestIntervalRotatingFileHandler(unittest.TestCase):
def setUp(self):
# 创建临时目录
self.temp_dir = tempfile.mkdtemp()
self.base_filename = os.path.join(self.temp_dir, "test.log")
def tearDown(self):
# 清理临时目录
shutil.rmtree(self.temp_dir, ignore_errors=True)
def test_initialization(self):
"""测试初始化参数校验"""
# 测试无效interval
with self.assertRaises(ValueError):
handler = IntervalRotatingFileHandler(self.base_filename, interval=7)
# 测试有效初始化
handler = IntervalRotatingFileHandler(self.base_filename, interval=6, backupDays=3)
self.assertEqual(handler.interval, 6)
self.assertEqual(handler.backup_days, 3)
handler.close()
def test_file_rotation(self):
"""测试日志文件滚动"""
handler = IntervalRotatingFileHandler(self.base_filename, interval=6, backupDays=1)
# 模拟初始状态
initial_day = handler.current_day
initial_hour = handler.current_hour
# 首次写入
record = LogRecord("test", 20, "/path", 1, "Test message", [], None)
handler.emit(record)
# 验证文件存在
expected_dir = Path(self.temp_dir) / initial_day
expected_file = f"test_{initial_day}-{initial_hour:02d}.log"
self.assertTrue((expected_dir / expected_file).exists())
# 验证符号链接
symlink = Path(self.temp_dir) / "current_test.log"
self.assertTrue(symlink.is_symlink())
handler.close()
def test_time_based_rollover(self):
"""测试基于时间的滚动触发"""
handler = IntervalRotatingFileHandler(self.base_filename, interval=1, backupDays=1)
# 强制设置初始时间
handler.current_day = "2000-01-01"
handler.current_hour = 0
# 测试小时变化触发
with unittest.mock.patch.object(handler, "_get_current_day", return_value="2000-01-01"):
with unittest.mock.patch.object(handler, "_get_current_hour", return_value=1):
self.assertTrue(handler.shouldRollover(None))
# 测试日期变化触发
with unittest.mock.patch.object(handler, "_get_current_day", return_value="2000-01-02"):
with unittest.mock.patch.object(handler, "_get_current_hour", return_value=0):
self.assertTrue(handler.shouldRollover(None))
handler.close()
def test_cleanup_logic(self):
"""测试过期文件清理"""
# 使用固定测试时间
test_time = datetime(2023, 1, 1, 12, 0)
with unittest.mock.patch("time.time", return_value=time.mktime(test_time.timetuple())):
handler = IntervalRotatingFileHandler(self.base_filename, interval=1, backupDays=0) # 立即清理
# 创建测试目录结构
old_day = (test_time - timedelta(days=2)).strftime("%Y-%m-%d")
old_dir = Path(self.temp_dir) / old_day
old_dir.mkdir()
# 创建测试文件
old_file = old_dir / f"test_{old_day}-00.log"
old_file.write_text("test content")
# 确保文件时间戳正确
old_time = time.mktime((test_time - timedelta(days=2)).timetuple())
os.utime(str(old_dir), (old_time, old_time))
os.utime(str(old_file), (old_time, old_time))
# 验证文件创建成功
self.assertTrue(old_file.exists())
# 执行清理
handler._clean_expired_data()
# 添加短暂延迟确保文件系统操作完成
time.sleep(0.1)
# 验证清理结果
if old_dir.exists():
# 调试输出:列出目录内容
print(f"Directory contents: {list(old_dir.glob('*'))}")
# 尝试强制删除以清理测试环境
try:
shutil.rmtree(str(old_dir))
except Exception as e:
print(f"Cleanup failed: {e}")
self.assertFalse(
old_dir.exists(),
f"Directory {old_dir} should have been deleted. Contents: {list(old_dir.glob('*')) if old_dir.exists() else '[]'}",
)
handler.close()
def test_multi_interval(self):
"""测试多间隔配置"""
for interval in [1, 2, 3, 4, 6, 8, 12, 24]:
with self.subTest(interval=interval):
handler = IntervalRotatingFileHandler(self.base_filename, interval=interval)
current_hour = handler._get_current_time().tm_hour
expected_hour = current_hour - (current_hour % interval)
self.assertEqual(handler.current_hour, expected_hour)
handler.close()
def test_utc_mode(self):
"""测试UTC时间模式"""
handler = IntervalRotatingFileHandler(self.base_filename, utc=True)
self.assertTrue(time.strftime("%Y-%m-%d", time.gmtime()).startswith(handler.current_day))
handler.close()
def test_symlink_creation(self):
"""测试符号链接创建和更新"""
handler = IntervalRotatingFileHandler(self.base_filename)
symlink = Path(self.temp_dir) / "current_test.log"
# 获取初始符号链接目标
initial_target = os.readlink(str(symlink))
# 强制触发滚动(模拟时间变化)
with unittest.mock.patch.object(handler, "_get_current_day", return_value="2000-01-01"):
with unittest.mock.patch.object(handler, "_get_current_hour", return_value=12):
handler.doRollover()
# 获取新符号链接目标
new_target = os.readlink(str(symlink))
# 验证目标已更新
self.assertNotEqual(initial_target, new_target)
self.assertIn("2000-01-01/test_2000-01-01-12.log", new_target)
handler.close()
class TestDailyRotatingFileHandler(unittest.TestCase):
"""测试 DailyRotatingFileHandler"""
def setUp(self):
self.temp_dir = tempfile.mkdtemp(prefix="fd_handler_test_")
def tearDown(self):
shutil.rmtree(self.temp_dir, ignore_errors=True)
def test_daily_rotation(self):
"""测试每天滚动"""
log_file = os.path.join(self.temp_dir, "test.log")
handler = DailyRotatingFileHandler(log_file, backupCount=3)
logger = getLogger("test_daily_rotation")
logger.addHandler(handler)
logger.setLevel(INFO)
# 写入第一条日志
logger.info("Test log message day 1")
handler.flush()
# 模拟时间变化到第二天
with patch.object(handler, "_compute_fn") as mock_compute:
tomorrow = (datetime.now() + timedelta(days=1)).strftime("%Y-%m-%d")
new_filename = f"test.log.{tomorrow}"
mock_compute.return_value = new_filename
# 手动触发滚动检查和执行
mock_record = MagicMock()
if handler.shouldRollover(mock_record):
handler.doRollover()
# 写入第二条日志
logger.info("Test log message day 2")
handler.flush()
handler.close()
# 验证文件存在
today = datetime.now().strftime("%Y-%m-%d")
tomorrow = (datetime.now() + timedelta(days=1)).strftime("%Y-%m-%d")
# 检查原始文件和带日期的文件
base_file = os.path.join(self.temp_dir, "test.log")
today_file = os.path.join(self.temp_dir, f"test.log.{today}")
tomorrow_file = os.path.join(self.temp_dir, f"test.log.{tomorrow}")
# 至少应该有一个文件存在
files_exist = any([os.path.isfile(base_file), os.path.isfile(today_file), os.path.isfile(tomorrow_file)])
self.assertTrue(files_exist, f"No log files found in {self.temp_dir}")
def test_backup_count(self):
"""测试备份文件数量限制"""
log_file = os.path.join(self.temp_dir, "test.log")
handler = DailyRotatingFileHandler(log_file, backupCount=2)
logger = getLogger("test_backup_count")
logger.addHandler(handler)
logger.setLevel(INFO)
# 创建多个日期的日志文件
base_date = datetime.now()
for i in range(5): # 创建5天的日志
date_str = (base_date - timedelta(days=i)).strftime("%Y-%m-%d")
test_file = os.path.join(self.temp_dir, f"test.log.{date_str}")
# 直接创建文件
with open(test_file, "w") as f:
f.write(f"Test log for {date_str}\n")
# 触发清理
handler.delete_expired_files()
handler.close()
# 验证备份文件数量应该保留最新的2个 + 当前文件)
log_files = [f for f in os.listdir(self.temp_dir) if f.startswith("test.log.")]
print(f"Log files found: {log_files}") # 调试输出
# backupCount=2 意味着应该最多保留2个备份文件
self.assertLessEqual(len(log_files), 3) # 2个备份 + 可能的当前文件
class TestLazyFileHandler(unittest.TestCase):
def setUp(self):
# 创建临时目录
self.tmpdir = tempfile.TemporaryDirectory()
self.logfile = Path(self.tmpdir.name) / "test.log"
def tearDown(self):
# 清理临时目录
self.tmpdir.cleanup()
def test_lazy_initialization_and_write(self):
logger = logging.getLogger("test_lazy")
logger.setLevel(logging.DEBUG)
# 初始化 LazyFileHandler
handler = LazyFileHandler(str(self.logfile), backupCount=3, level=logging.DEBUG)
logger.addHandler(handler)
# 此时 _real_handler 应该还没创建
self.assertIsNone(handler._real_handler)
# 写一条日志
logger.info("Hello Lazy Handler")
# 写入后 _real_handler 应该被创建
self.assertIsNotNone(handler._real_handler)
# 日志文件应该存在且内容包含日志信息
self.assertTrue(self.logfile.exists())
with open(self.logfile, "r") as f:
content = f.read()
self.assertIn("Hello Lazy Handler", content)
# 关闭 handler
handler.close()
logger.removeHandler(handler)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,81 @@
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import shutil
import tempfile
import unittest
from unittest.mock import patch
from fastdeploy.logger.logger import FastDeployLogger
class LoggerTests(unittest.TestCase):
"""修改后的测试类,通过实例测试内部方法"""
def setUp(self):
self.tmp_dir = tempfile.mkdtemp(prefix="fd_unittest_")
self.env_patchers = [
patch("fastdeploy.envs.FD_LOG_DIR", self.tmp_dir),
patch("fastdeploy.envs.FD_DEBUG", "0"),
patch("fastdeploy.envs.FD_LOG_BACKUP_COUNT", "1"),
]
for p in self.env_patchers:
p.start()
# 创建测试用实例
self.logger = FastDeployLogger()
def tearDown(self):
for p in self.env_patchers:
p.stop()
shutil.rmtree(self.tmp_dir, ignore_errors=True)
def test_unified_logger(self):
"""通过实例测试_get_unified_logger"""
test_cases = [(None, "fastdeploy"), ("module", "fastdeploy.module"), ("fastdeploy.utils", "fastdeploy.utils")]
for name, expected in test_cases:
with self.subTest(name=name):
result = self.logger._get_unified_logger(name)
self.assertEqual(result.name, expected)
def test_main_module_handling(self):
"""测试__main__特殊处理"""
with patch("__main__.__file__", "/path/to/test_script.py"):
result = self.logger._get_unified_logger("__main__")
self.assertEqual(result.name, "fastdeploy.main.test_script")
def test_legacy_logger_creation(self):
"""通过实例测试_get_legacy_logger"""
legacy_logger = self.logger._get_legacy_logger(
"test", "test.log", without_formater=False, print_to_console=True
)
# 验证基础属性
self.assertTrue(legacy_logger.name.startswith("legacy."))
self.assertEqual(legacy_logger.level, logging.INFO)
# 验证handler
self.assertEqual(len(legacy_logger.handlers), 3) # 文件+错误+控制台
def test_logger_propagate(self):
"""测试日志传播设置"""
legacy_logger = self.logger._get_legacy_logger("test", "test.log")
self.assertTrue(legacy_logger.propagate)
if __name__ == "__main__":
unittest.main(verbosity=2)

View File

@@ -0,0 +1,134 @@
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import os
import shutil
import tempfile
import unittest
from pathlib import Path
from unittest.mock import patch
from fastdeploy.logger.setup_logging import setup_logging
class TestSetupLogging(unittest.TestCase):
# -------------------------------------------------
# 夹具:每个测试独占临时目录
# -------------------------------------------------
def setUp(self):
self.temp_dir = tempfile.mkdtemp(prefix="logger_setup_test_")
# 统一 patch 环境变量
self.patches = [
patch("fastdeploy.envs.FD_LOG_DIR", self.temp_dir),
patch("fastdeploy.envs.FD_DEBUG", "0"),
patch("fastdeploy.envs.FD_LOG_BACKUP_COUNT", "3"),
]
[p.start() for p in self.patches]
def tearDown(self):
[p.stop() for p in self.patches]
shutil.rmtree(self.temp_dir, ignore_errors=True)
# 清理单例标记,避免影响其他测试
if hasattr(setup_logging, "_configured"):
delattr(setup_logging, "_configured")
# -------------------------------------------------
# 基础:目录自动创建
# -------------------------------------------------
def test_log_dir_created(self):
nested = os.path.join(self.temp_dir, "a", "b", "c")
setup_logging(log_dir=nested)
self.assertTrue(Path(nested).is_dir())
# -------------------------------------------------
# 默认配置文件:文件 handler 不带颜色
# -------------------------------------------------
def test_default_config_file_no_ansi(self):
setup_logging()
logger = logging.getLogger("fastdeploy")
logger.error("test ansi")
default_file = Path(self.temp_dir) / "default.log"
self.assertTrue(default_file.exists())
with default_file.open() as f:
content = f.read()
# 文件中不应出现 ANSI 转义
self.assertNotIn("\033[", content)
# -------------------------------------------------
# 调试级别开关
# -------------------------------------------------
def test_debug_level(self):
with patch("fastdeploy.envs.FD_DEBUG", "1"):
setup_logging()
logger = logging.getLogger("fastdeploy")
self.assertEqual(logger.level, logging.DEBUG)
# debug 消息应该能落到文件
logger.debug("debug msg")
default_file = Path(self.temp_dir) / "default.log"
self.assertIn("debug msg", default_file.read_text())
# -------------------------------------------------
# 自定义 JSON 配置文件加载
# -------------------------------------------------
def test_custom_config_file(self):
custom_cfg = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {"plain": {"format": "%(message)s"}},
"handlers": {
"custom": {
"class": "logging.FileHandler",
"filename": os.path.join(self.temp_dir, "custom.log"),
"formatter": "plain",
}
},
"loggers": {"fastdeploy": {"handlers": ["custom"], "level": "INFO"}},
}
cfg_path = Path(self.temp_dir) / "cfg.json"
cfg_path.write_text(json.dumps(custom_cfg))
setup_logging(config_file=str(cfg_path))
logger = logging.getLogger("fastdeploy")
logger.info("from custom cfg")
custom_file = Path(self.temp_dir) / "custom.log"
self.assertEqual(custom_file.read_text().strip(), "from custom cfg")
# -------------------------------------------------
# 重复调用 setup_logging 不会重复配置
# -------------------------------------------------
def test_configure_once(self):
logger1 = setup_logging()
logger2 = setup_logging()
self.assertIs(logger1, logger2)
# -------------------------------------------------
# 控制台 handler 使用 ColoredFormatter
# -------------------------------------------------
@patch("logging.StreamHandler.emit")
def test_console_colored(self, mock_emit):
setup_logging()
logger = logging.getLogger("fastdeploy")
logger.error("color test")
# 只要 ColoredFormatter 被实例化即可,简单断言 emit 被调用
self.assertTrue(mock_emit.called)
if __name__ == "__main__":
unittest.main()