import http import inspect import logging import re from sys import stdout from types import FunctionType from typing import TYPE_CHECKING from app.config import settings from app.utils import snake_to_pascal import loguru if TYPE_CHECKING: from loguru import Logger logger: "Logger" = loguru.logger class InterceptHandler(logging.Handler): def emit(self, record: logging.LogRecord) -> None: # Get corresponding Loguru level if it exists. try: level: str | int = logger.level(record.levelname).name except ValueError: level = record.levelno # Find caller from where originated the logged message. frame, depth = inspect.currentframe(), 0 while frame: filename = frame.f_code.co_filename is_logging = filename == logging.__file__ is_frozen = "importlib" in filename and "_bootstrap" in filename if depth > 0 and not (is_logging or is_frozen): break frame = frame.f_back depth += 1 message = record.getMessage() _logger = logger if record.name == "uvicorn.access": message = self._format_uvicorn_access_log(message) color = True _logger = uvicorn_logger() elif record.name == "uvicorn.error": message = self._format_uvicorn_error_log(message) _logger = uvicorn_logger() color = True else: color = False _logger.opt(depth=depth, exception=record.exc_info, colors=color).log(level, message) def _format_uvicorn_error_log(self, message: str) -> str: websocket_pattern = r'(\d+\.\d+\.\d+\.\d+:\d+)\s*-\s*"WebSocket\s+([^"]+)"\s+([\w\[\]]+)' websocket_match = re.search(websocket_pattern, message) if websocket_match: ip, path, status = websocket_match.groups() colored_ip = f"{ip}" status_colors = { "[accepted]": "[accepted]", "403": "403 [rejected]", } colored_status = status_colors.get(status.lower(), f"{status}") return f'{colored_ip} - "WebSocket {path}" {colored_status}' else: return message def _format_uvicorn_access_log(self, message: str) -> str: http_pattern = r'(\d+\.\d+\.\d+\.\d+:\d+)\s*-\s*"(\w+)\s+([^"]+)"\s+(\d+)' http_match = re.search(http_pattern, message) if http_match: ip, method, path, status_code = http_match.groups() try: status_phrase = http.HTTPStatus(int(status_code)).phrase except ValueError: status_phrase = "" colored_ip = f"{ip}" method_colors = { "GET": "GET", "POST": "POST", "PUT": "PUT", "DELETE": "DELETE", "PATCH": "PATCH", "OPTIONS": "OPTIONS", "HEAD": "HEAD", } colored_method = method_colors.get(method, f"{method}") status = int(status_code) status_color = "white" if 200 <= status < 300: status_color = "green" elif 300 <= status < 400: status_color = "yellow" elif 400 <= status < 500 or 500 <= status < 600: status_color = "red" return ( f'{colored_ip} - "{colored_method} ' f'{path}" ' f"<{status_color}>{status_code} {status_phrase}" ) return message def get_caller_class_name(module_prefix: str = "", just_last_part: bool = True) -> str | None: stack = inspect.stack() for frame_info in stack[2:]: module = frame_info.frame.f_globals.get("__name__", "") if module_prefix and not module.startswith(module_prefix): continue local_vars = frame_info.frame.f_locals # 实例方法 if "self" in local_vars: return local_vars["self"].__class__.__name__ # 类方法 if "cls" in local_vars: return local_vars["cls"].__name__ # 静态方法 / 普通函数 -> 尝试通过函数名匹配类 func_name = frame_info.function for obj_name, obj in frame_info.frame.f_globals.items(): if isinstance(obj, type): # 遍历模块内类 cls = obj attr = getattr(cls, func_name, None) if isinstance(attr, (staticmethod, classmethod, FunctionType)): return cls.__name__ # 如果没找到类,返回模块名 if just_last_part: return module.rsplit(".", 1)[-1] return module return None def service_logger(name: str) -> "Logger": return logger.bind(service=name) def fetcher_logger(name: str) -> "Logger": return logger.bind(fetcher=name) def task_logger(name: str) -> "Logger": return logger.bind(task=name) def system_logger(name: str) -> "Logger": return logger.bind(system=name) def uvicorn_logger() -> "Logger": return logger.bind(uvicorn="Uvicorn") def log(name: str) -> "Logger": return logger.bind(real_name=name) def dynamic_format(record): name = "" uvicorn = record["extra"].get("uvicorn") if uvicorn: name = f"{uvicorn}" service = record["extra"].get("service") if not service: service = get_caller_class_name("app.service") if service: name = f"{service}" fetcher = record["extra"].get("fetcher") if not fetcher: fetcher = get_caller_class_name("app.fetcher") if fetcher: name = f"{fetcher}" task = record["extra"].get("task") if not task: task = get_caller_class_name("app.tasks") if task: task = snake_to_pascal(task) name = f"{task}" system = record["extra"].get("system") if system: name = f"{system}" if name == "": real_name = record["extra"].get("real_name", "") or record["name"] name = f"{real_name}" format = f"{{time:YYYY-MM-DD HH:mm:ss}} [{{level}}] | {name} | {{message}}\n" if record["exception"]: format += "{exception}\n" return format logger.remove() logger.add( stdout, colorize=True, format=dynamic_format, level=settings.log_level, diagnose=settings.debug, ) logger.add( "logs/{time:YYYY-MM-DD}.log", rotation="00:00", retention="30 days", colorize=False, format=dynamic_format, level=settings.log_level, diagnose=settings.debug, encoding="utf8", ) logging.basicConfig(handlers=[InterceptHandler()], level=settings.log_level, force=True) uvicorn_loggers = [ "uvicorn", "uvicorn.error", "uvicorn.access", "fastapi", ] for logger_name in uvicorn_loggers: _uvicorn_logger = logging.getLogger(logger_name) _uvicorn_logger.handlers = [InterceptHandler()] _uvicorn_logger.propagate = False logging.getLogger("httpx").setLevel("WARNING") logging.getLogger("apscheduler").setLevel("WARNING")