Files
g0v0-server/app/log.py

219 lines
6.8 KiB
Python

from __future__ import annotations
import http
import inspect
import logging
import re
from sys import stdout
from types import FunctionType
from typing import TYPE_CHECKING
from app.config import settings
from app.utils import snake_to_pascal
import loguru
if TYPE_CHECKING:
from loguru import Logger
logger: "Logger" = loguru.logger
class InterceptHandler(logging.Handler):
def emit(self, record: logging.LogRecord) -> None:
# Get corresponding Loguru level if it exists.
try:
level: str | int = logger.level(record.levelname).name
except ValueError:
level = record.levelno
# Find caller from where originated the logged message.
frame, depth = inspect.currentframe(), 0
while frame:
filename = frame.f_code.co_filename
is_logging = filename == logging.__file__
is_frozen = "importlib" in filename and "_bootstrap" in filename
if depth > 0 and not (is_logging or is_frozen):
break
frame = frame.f_back
depth += 1
message = record.getMessage()
if record.name == "uvicorn.access":
message = self._format_uvicorn_access_log(message)
color = True
elif record.name == "uvicorn.error":
message = self._format_uvicorn_error_log(message)
color = True
else:
color = False
logger.opt(depth=depth, exception=record.exc_info, colors=color).log(level, message)
def _format_uvicorn_error_log(self, message: str) -> str:
websocket_pattern = r'(\d+\.\d+\.\d+\.\d+:\d+)\s*-\s*"WebSocket\s+([^"]+)"\s+([\w\[\]]+)'
websocket_match = re.search(websocket_pattern, message)
if websocket_match:
ip, path, status = websocket_match.groups()
colored_ip = f"<cyan>{ip}</cyan>"
status_colors = {
"[accepted]": "<green>[accepted]</green>",
"403": "<red>403 [rejected]</red>",
}
colored_status = status_colors.get(status.lower(), f"<white>{status}</white>")
return f'{colored_ip} - "<bold><magenta>WebSocket</magenta> {path}</bold>" {colored_status}'
else:
return message
def _format_uvicorn_access_log(self, message: str) -> str:
http_pattern = r'(\d+\.\d+\.\d+\.\d+:\d+)\s*-\s*"(\w+)\s+([^"]+)"\s+(\d+)'
http_match = re.search(http_pattern, message)
if http_match:
ip, method, path, status_code = http_match.groups()
try:
status_phrase = http.HTTPStatus(int(status_code)).phrase
except ValueError:
status_phrase = ""
colored_ip = f"<cyan>{ip}</cyan>"
method_colors = {
"GET": "<green>GET</green>",
"POST": "<blue>POST</blue>",
"PUT": "<yellow>PUT</yellow>",
"DELETE": "<red>DELETE</red>",
"PATCH": "<magenta>PATCH</magenta>",
"OPTIONS": "<white>OPTIONS</white>",
"HEAD": "<white>HEAD</white>",
}
colored_method = method_colors.get(method, f"<white>{method}</white>")
status = int(status_code)
status_color = "white"
if 200 <= status < 300:
status_color = "green"
elif 300 <= status < 400:
status_color = "yellow"
elif 400 <= status < 500:
status_color = "red"
elif 500 <= status < 600:
status_color = "red"
return (
f'{colored_ip} - "<bold>{colored_method} '
f'{path}</bold>" '
f"<{status_color}>{status_code} {status_phrase}</{status_color}>"
)
return message
def get_caller_class_name(module_prefix: str = "", just_last_part: bool = True) -> str | None:
"""获取调用类名/模块名,仅对指定模块前缀生效"""
stack = inspect.stack()
for frame_info in stack[2:]:
module = frame_info.frame.f_globals.get("__name__", "")
if module_prefix and not module.startswith(module_prefix):
continue
local_vars = frame_info.frame.f_locals
# 实例方法
if "self" in local_vars:
return local_vars["self"].__class__.__name__
# 类方法
if "cls" in local_vars:
return local_vars["cls"].__name__
# 静态方法 / 普通函数 -> 尝试通过函数名匹配类
func_name = frame_info.function
for obj_name, obj in frame_info.frame.f_globals.items():
if isinstance(obj, type): # 遍历模块内类
cls = obj
attr = getattr(cls, func_name, None)
if isinstance(attr, (staticmethod, classmethod, FunctionType)):
return cls.__name__
# 如果没找到类,返回模块名
if just_last_part:
return module.rsplit(".", 1)[-1]
return module
return None
def service_logger(name: str) -> Logger:
return logger.bind(service=name)
def fetcher_logger(name: str) -> Logger:
return logger.bind(fetcher=name)
def task_logger(name: str) -> Logger:
return logger.bind(task=name)
def system_logger(name: str) -> Logger:
return logger.bind(system=name)
def dynamic_format(record):
prefix = ""
fetcher = record["extra"].get("fetcher")
if not fetcher:
fetcher = get_caller_class_name("app.fetcher")
if fetcher:
prefix = f"<magenta>[{fetcher}]</magenta> "
service = record["extra"].get("service")
if not service:
service = get_caller_class_name("app.service")
if service:
prefix = f"<blue>[{service}]</blue> "
task = record["extra"].get("task")
if not task:
task = get_caller_class_name("app.tasks")
if task:
task = snake_to_pascal(task)
prefix = f"<green>[{task}]</green> "
return f"<green>{{time:YYYY-MM-DD HH:mm:ss}}</green> [<level>{{level}}</level>] | {prefix}{{message}}\n"
logger.remove()
logger.add(
stdout,
colorize=True,
format=dynamic_format,
level=settings.log_level,
diagnose=settings.debug,
)
logger.add(
"logs/{time:YYYY-MM-DD}.log",
rotation="00:00",
retention="30 days",
colorize=False,
format="{time:YYYY-MM-DD HH:mm:ss} {level} | {message}",
level=settings.log_level,
diagnose=settings.debug,
encoding="utf8",
)
logging.basicConfig(handlers=[InterceptHandler()], level=settings.log_level, force=True)
uvicorn_loggers = [
"uvicorn",
"uvicorn.error",
"uvicorn.access",
"fastapi",
]
for logger_name in uvicorn_loggers:
uvicorn_logger = logging.getLogger(logger_name)
uvicorn_logger.handlers = [InterceptHandler()]
uvicorn_logger.propagate = False
logging.getLogger("httpx").setLevel("WARNING")
logging.getLogger("apscheduler").setLevel("WARNING")