From afd5018bcd35359e13d6af9bc25f1683eb4e1e34 Mon Sep 17 00:00:00 2001 From: MingxuanGame Date: Fri, 3 Oct 2025 08:22:41 +0000 Subject: [PATCH] refactor(log): add prefix for fetcher and services --- app/fetcher/beatmap.py | 6 ++- app/fetcher/beatmap_raw.py | 7 ++- app/fetcher/beatmapset.py | 62 ++++++++-------------- app/log.py | 59 ++++++++++++++++++++- app/service/audio_proxy_service.py | 3 +- app/service/beatmapset_update_service.py | 67 +++++++----------------- app/service/verification_service.py | 6 +-- 7 files changed, 112 insertions(+), 98 deletions(-) diff --git a/app/fetcher/beatmap.py b/app/fetcher/beatmap.py index fa49cf4..3909572 100644 --- a/app/fetcher/beatmap.py +++ b/app/fetcher/beatmap.py @@ -1,10 +1,12 @@ from __future__ import annotations from app.database.beatmap import BeatmapResp -from app.log import logger +from app.log import fetcher_logger from ._base import BaseFetcher +logger = fetcher_logger("BeatmapFetcher") + class BeatmapFetcher(BaseFetcher): async def get_beatmap(self, beatmap_id: int | None = None, beatmap_checksum: str | None = None) -> BeatmapResp: @@ -14,7 +16,7 @@ class BeatmapFetcher(BaseFetcher): params = {"checksum": beatmap_checksum} else: raise ValueError("Either beatmap_id or beatmap_checksum must be provided.") - logger.opt(colors=True).debug(f"[BeatmapFetcher] get_beatmap: {params}") + logger.opt(colors=True).debug(f"get_beatmap: {params}") return BeatmapResp.model_validate( await self.request_api( diff --git a/app/fetcher/beatmap_raw.py b/app/fetcher/beatmap_raw.py index ccb19b8..bdf1f90 100644 --- a/app/fetcher/beatmap_raw.py +++ b/app/fetcher/beatmap_raw.py @@ -1,10 +1,11 @@ from __future__ import annotations +from app.log import fetcher_logger + from ._base import BaseFetcher from httpx import AsyncClient, HTTPError from httpx._models import Response -from loguru import logger import redis.asyncio as redis urls = [ @@ -13,12 +14,14 @@ urls = [ "https://catboy.best/osu/{beatmap_id}", ] +logger = fetcher_logger("BeatmapRawFetcher") + class BeatmapRawFetcher(BaseFetcher): async def get_beatmap_raw(self, beatmap_id: int) -> str: for url in urls: req_url = url.format(beatmap_id=beatmap_id) - logger.opt(colors=True).debug(f"[BeatmapRawFetcher] get_beatmap_raw: {req_url}") + logger.opt(colors=True).debug(f"get_beatmap_raw: {req_url}") resp = await self._request(req_url) if resp.status_code >= 400: continue diff --git a/app/fetcher/beatmapset.py b/app/fetcher/beatmapset.py index 80deb82..8801fef 100644 --- a/app/fetcher/beatmapset.py +++ b/app/fetcher/beatmapset.py @@ -7,7 +7,7 @@ import json from app.database.beatmapset import BeatmapsetResp, SearchBeatmapsetsResp from app.helpers.rate_limiter import osu_api_rate_limiter -from app.log import logger +from app.log import fetcher_logger from app.models.beatmap import SearchQueryModel from app.models.model import Cursor from app.utils import bg_tasks @@ -24,6 +24,9 @@ class RateLimitError(Exception): pass +logger = fetcher_logger("BeatmapsetFetcher") + + class BeatmapsetFetcher(BaseFetcher): @staticmethod def _get_homepage_queries() -> list[tuple[SearchQueryModel, Cursor]]: @@ -135,7 +138,7 @@ class BeatmapsetFetcher(BaseFetcher): return {} async def get_beatmapset(self, beatmap_set_id: int) -> BeatmapsetResp: - logger.opt(colors=True).debug(f"[BeatmapsetFetcher] get_beatmapset: {beatmap_set_id}") + logger.opt(colors=True).debug(f"get_beatmapset: {beatmap_set_id}") return BeatmapsetResp.model_validate( await self.request_api(f"https://osu.ppy.sh/api/v2/beatmapsets/{beatmap_set_id}") @@ -144,7 +147,7 @@ class BeatmapsetFetcher(BaseFetcher): async def search_beatmapset( self, query: SearchQueryModel, cursor: Cursor, redis_client: redis.Redis ) -> SearchBeatmapsetsResp: - logger.opt(colors=True).debug(f"[BeatmapsetFetcher] search_beatmapset: {query}") + logger.opt(colors=True).debug(f"search_beatmapset: {query}") # 生成缓存键 cache_key = self._generate_cache_key(query, cursor) @@ -152,17 +155,15 @@ class BeatmapsetFetcher(BaseFetcher): # 尝试从缓存获取结果 cached_result = await redis_client.get(cache_key) if cached_result: - logger.opt(colors=True).debug(f"[BeatmapsetFetcher] Cache hit for key: {cache_key}") + logger.opt(colors=True).debug(f"Cache hit for key: {cache_key}") try: cached_data = json.loads(cached_result) return SearchBeatmapsetsResp.model_validate(cached_data) except Exception as e: - logger.opt(colors=True).warning( - f"[BeatmapsetFetcher] Cache data invalid, fetching from API: {e}" - ) + logger.opt(colors=True).warning(f"Cache data invalid, fetching from API: {e}") # 缓存未命中,从 API 获取数据 - logger.opt(colors=True).debug("[BeatmapsetFetcher] Cache miss, fetching from API") + logger.opt(colors=True).debug("Cache miss, fetching from API") params = query.model_dump(exclude_none=True, exclude_unset=True, exclude_defaults=True) @@ -186,9 +187,7 @@ class BeatmapsetFetcher(BaseFetcher): cache_ttl = 15 * 60 # 15 分钟 await redis_client.set(cache_key, json.dumps(api_response, separators=(",", ":")), ex=cache_ttl) - logger.opt(colors=True).debug( - f"[BeatmapsetFetcher] Cached result for key: {cache_key} (TTL: {cache_ttl}s)" - ) + logger.opt(colors=True).debug(f"Cached result for key: {cache_key} (TTL: {cache_ttl}s)") resp = SearchBeatmapsetsResp.model_validate(api_response) @@ -204,9 +203,7 @@ class BeatmapsetFetcher(BaseFetcher): try: await self.prefetch_next_pages(query, api_response["cursor"], redis_client, pages=1) except RateLimitError: - logger.opt(colors=True).info( - "[BeatmapsetFetcher] Prefetch skipped due to rate limit" - ) + logger.opt(colors=True).info("Prefetch skipped due to rate limit") bg_tasks.add_task(delayed_prefetch) @@ -230,14 +227,14 @@ class BeatmapsetFetcher(BaseFetcher): # 使用当前 cursor 请求下一页 next_query = query.model_copy() - logger.opt(colors=True).debug(f"[BeatmapsetFetcher] Prefetching page {page + 1}") + logger.opt(colors=True).debug(f"Prefetching page {page + 1}") # 生成下一页的缓存键 next_cache_key = self._generate_cache_key(next_query, cursor) # 检查是否已经缓存 if await redis_client.exists(next_cache_key): - logger.opt(colors=True).debug(f"[BeatmapsetFetcher] Page {page + 1} already cached") + logger.opt(colors=True).debug(f"Page {page + 1} already cached") # 尝试从缓存获取cursor继续预取 cached_data = await redis_client.get(next_cache_key) if cached_data: @@ -282,22 +279,18 @@ class BeatmapsetFetcher(BaseFetcher): ex=prefetch_ttl, ) - logger.opt(colors=True).debug( - f"[BeatmapsetFetcher] Prefetched page {page + 1} (TTL: {prefetch_ttl}s)" - ) + logger.opt(colors=True).debug(f"Prefetched page {page + 1} (TTL: {prefetch_ttl}s)") except RateLimitError: - logger.opt(colors=True).info("[BeatmapsetFetcher] Prefetch stopped due to rate limit") + logger.opt(colors=True).info("Prefetch stopped due to rate limit") except Exception as e: - logger.opt(colors=True).warning(f"[BeatmapsetFetcher] Prefetch failed: {e}") + logger.opt(colors=True).warning(f"Prefetch failed: {e}") async def warmup_homepage_cache(self, redis_client: redis.Redis) -> None: """预热主页缓存""" homepage_queries = self._get_homepage_queries() - logger.opt(colors=True).info( - f"[BeatmapsetFetcher] Starting homepage cache warmup ({len(homepage_queries)} queries)" - ) + logger.opt(colors=True).info(f"Starting homepage cache warmup ({len(homepage_queries)} queries)") for i, (query, cursor) in enumerate(homepage_queries): try: @@ -309,9 +302,7 @@ class BeatmapsetFetcher(BaseFetcher): # 检查是否已经缓存 if await redis_client.exists(cache_key): - logger.opt(colors=True).debug( - f"[BeatmapsetFetcher] Query {query.sort} already cached" - ) + logger.opt(colors=True).debug(f"Query {query.sort} already cached") continue # 请求并缓存 @@ -334,24 +325,15 @@ class BeatmapsetFetcher(BaseFetcher): ex=cache_ttl, ) - logger.opt(colors=True).info( - f"[BeatmapsetFetcher] Warmed up cache for {query.sort} (TTL: {cache_ttl}s)" - ) + logger.opt(colors=True).info(f"Warmed up cache for {query.sort} (TTL: {cache_ttl}s)") if api_response.get("cursor"): try: await self.prefetch_next_pages(query, api_response["cursor"], redis_client, pages=2) except RateLimitError: - logger.opt(colors=True).info( - f"[BeatmapsetFetcher] Warmup prefetch " - f"skipped for {query.sort} due to rate limit" - ) + logger.opt(colors=True).info(f"Warmup prefetch skipped for {query.sort} due to rate limit") except RateLimitError: - logger.opt(colors=True).warning( - f"[BeatmapsetFetcher] Warmup skipped for {query.sort} due to rate limit" - ) + logger.opt(colors=True).warning(f"Warmup skipped for {query.sort} due to rate limit") except Exception as e: - logger.opt(colors=True).error( - f"[BeatmapsetFetcher] Failed to warmup cache for {query.sort}: {e}" - ) + logger.opt(colors=True).error(f"Failed to warmup cache for {query.sort}: {e}") diff --git a/app/log.py b/app/log.py index 9186f49..57b8eb0 100644 --- a/app/log.py +++ b/app/log.py @@ -5,6 +5,7 @@ import inspect import logging import re from sys import stdout +from types import FunctionType from typing import TYPE_CHECKING from app.config import settings @@ -107,11 +108,67 @@ class InterceptHandler(logging.Handler): return message +def get_caller_class_name(module_prefix: str = ""): + """获取调用类名/模块名,仅对指定模块前缀生效""" + stack = inspect.stack() + for frame_info in stack[2:]: + module = frame_info.frame.f_globals.get("__name__", "") + if module_prefix and not module.startswith(module_prefix): + continue + + local_vars = frame_info.frame.f_locals + # 实例方法 + if "self" in local_vars: + return local_vars["self"].__class__.__name__ + # 类方法 + if "cls" in local_vars: + return local_vars["cls"].__name__ + + # 静态方法 / 普通函数 -> 尝试通过函数名匹配类 + func_name = frame_info.function + for obj_name, obj in frame_info.frame.f_globals.items(): + if isinstance(obj, type): # 遍历模块内类 + cls = obj + attr = getattr(cls, func_name, None) + if isinstance(attr, (staticmethod, classmethod, FunctionType)): + return cls.__name__ + + # 如果没找到类,返回模块名 + return module + return None + + +def service_logger(name: str) -> Logger: + return logger.bind(service=name) + + +def fetcher_logger(name: str) -> Logger: + return logger.bind(fetcher=name) + + +def dynamic_format(record): + prefix = "" + + fetcher = record["extra"].get("fetcher") + if not fetcher: + fetcher = get_caller_class_name("app.fetcher") + if fetcher: + prefix = f"[{fetcher}] " + + service = record["extra"].get("service") + if not service: + service = get_caller_class_name("app.service") + if service: + prefix = f"[{service}] " + + return f"{{time:YYYY-MM-DD HH:mm:ss}} [{{level}}] | {prefix}{{message}}\n" + + logger.remove() logger.add( stdout, colorize=True, - format=("{time:YYYY-MM-DD HH:mm:ss} [{level}] | {message}"), + format=dynamic_format, level=settings.log_level, diagnose=settings.debug, ) diff --git a/app/service/audio_proxy_service.py b/app/service/audio_proxy_service.py index 0209997..5ff77d9 100644 --- a/app/service/audio_proxy_service.py +++ b/app/service/audio_proxy_service.py @@ -5,9 +5,10 @@ from __future__ import annotations +from app.log import logger + from fastapi import HTTPException import httpx -from loguru import logger import redis.asyncio as redis diff --git a/app/service/beatmapset_update_service.py b/app/service/beatmapset_update_service.py index 5d80947..16199d5 100644 --- a/app/service/beatmapset_update_service.py +++ b/app/service/beatmapset_update_service.py @@ -136,9 +136,7 @@ class BeatmapsetUpdateService: status = BeatmapRankStatus(beatmapset.ranked) if status.has_pp() or status == BeatmapRankStatus.LOVED: return False - logger.opt(colors=True).debug( - f"[BeatmapsetUpdateService] added missing beatmapset {beatmapset_id} " - ) + logger.debug(f"added missing beatmapset {beatmapset_id} ") return True async def add_missing_beatmapsets(self): @@ -167,11 +165,9 @@ class BeatmapsetUpdateService: if await self.add_missing_beatmapset(missing): total += 1 except Exception as e: - logger.opt(colors=True).error( - f"[BeatmapsetUpdateService] failed to add missing beatmapset {missing}: {e}" - ) + logger.error(f"failed to add missing beatmapset {missing}: {e}") if total > 0: - logger.opt(colors=True).info(f"[BeatmapsetUpdateService] added {total} missing beatmapset") + logger.info(f"added {total} missing beatmapset") self._adding_missing = False async def add(self, beatmapset: BeatmapsetResp): @@ -211,23 +207,17 @@ class BeatmapsetUpdateService: processing = ProcessingBeatmapset(beatmapset, sync_record) next_time_delta = processing.calculate_next_sync_time() if not next_time_delta: - logger.opt(colors=True).info( - f"[BeatmapsetUpdateService] [{beatmapset.id}] " - "beatmapset has transformed to ranked or loved," - " removing from sync list" - ) + logger.info(f"[{beatmapset.id}] beatmapset has transformed to ranked or loved, removing from sync list") await session.delete(sync_record) await session.commit() return sync_record.next_sync_time = utcnow() + next_time_delta - logger.opt(colors=True).info( - f"[BeatmapsetUpdateService] [{beatmapset.id}] next sync at {sync_record.next_sync_time}" - ) + logger.info(f"[{beatmapset.id}] next sync at {sync_record.next_sync_time}") await session.commit() async def _update_beatmaps(self): async with with_db() as session: - logger.opt(colors=True).info("[BeatmapsetUpdateService] checking for beatmapset updates...") + logger.info("checking for beatmapset updates...") now = utcnow() records = await session.exec( select(BeatmapSync) @@ -235,22 +225,17 @@ class BeatmapsetUpdateService: .order_by(col(BeatmapSync.next_sync_time).desc()) ) for record in records: - logger.opt(colors=True).info( - f"[BeatmapsetUpdateService] [{record.beatmapset_id}] syncing..." - ) + logger.info(f"[{record.beatmapset_id}] syncing...") try: beatmapset = await self.fetcher.get_beatmapset(record.beatmapset_id) except Exception as e: if isinstance(e, HTTPError): - logger.opt(colors=True).warning( - f"[BeatmapsetUpdateService] [{record.beatmapset_id}] " + logger.warning( + f"[{record.beatmapset_id}] " f"failed to fetch beatmapset: [{e.__class__.__name__}] {e}, retrying later" ) else: - logger.opt(colors=True).exception( - f"[BeatmapsetUpdateService] [{record.beatmapset_id}] " - f"unexpected error: {e}, retrying later" - ) + logger.exception(f"[{record.beatmapset_id}] unexpected error: {e}, retrying later") record.next_sync_time = utcnow() + timedelta(seconds=MIN_DELTA) continue processing = ProcessingBeatmapset(beatmapset, record) @@ -282,18 +267,15 @@ class BeatmapsetUpdateService: next_time_delta = processing.calculate_next_sync_time() if not next_time_delta: - logger.opt(colors=True).info( - f"[BeatmapsetUpdateService] [{record.beatmapset_id}] beatmapset " + logger.info( + f"[{record.beatmapset_id}] beatmapset " "has transformed to ranked or loved," " removing from sync list" ) await session.delete(record) else: record.next_sync_time = utcnow() + next_time_delta - logger.opt(colors=True).info( - f"[BeatmapsetUpdateService] [{record.beatmapset_id}] " - f"next sync at {record.next_sync_time}" - ) + logger.info(f"[{record.beatmapset_id}] next sync at {record.next_sync_time}") await session.commit() async def _process_changed_beatmapset(self, beatmapset: BeatmapsetResp): @@ -323,9 +305,7 @@ class BeatmapsetUpdateService: await score.ranked_score.delete(session) total += 1 if total > 0: - logger.opt(colors=True).info( - f"[BeatmapsetUpdateService] [beatmap: {beatmap_id}] processed {total} old scores" - ) + logger.info(f"[beatmap: {beatmap_id}] processed {total} old scores") await session.commit() for change in changed: @@ -333,28 +313,17 @@ class BeatmapsetUpdateService: try: beatmap = await self.fetcher.get_beatmap(change.beatmap_id) except Exception as e: - logger.opt(colors=True).error( - f"[BeatmapsetUpdateService] [beatmap: {change.beatmap_id}] " - f"failed to fetch added beatmap: {e}, skipping" - ) + logger.error(f"[beatmap: {change.beatmap_id}] failed to fetch added beatmap: {e}, skipping") continue - logger.opt(colors=True).info( - f"[BeatmapsetUpdateService] [{beatmap.beatmapset_id}] adding beatmap {beatmap.id}" - ) + logger.info(f"[{beatmap.beatmapset_id}] adding beatmap {beatmap.id}") await Beatmap.from_resp_no_save(session, beatmap) else: try: beatmap = await self.fetcher.get_beatmap(change.beatmap_id) except Exception as e: - logger.opt(colors=True).error( - f"[BeatmapsetUpdateService] [beatmap: {change.beatmap_id}] " - f"failed to fetch changed beatmap: {e}, skipping" - ) + logger.error(f"[beatmap: {change.beatmap_id}] failed to fetch changed beatmap: {e}, skipping") continue - logger.opt(colors=True).info( - f"[BeatmapsetUpdateService] [{beatmap.beatmapset_id}] processing beatmap " - f"{beatmap.id} change {change.type}" - ) + logger.info(f"[{beatmap.beatmapset_id}] processing beatmap {beatmap.id} change {change.type}") new_db_beatmap = await Beatmap.from_resp_no_save(session, beatmap) existing_beatmap = await session.get(Beatmap, change.beatmap_id) if existing_beatmap: diff --git a/app/service/verification_service.py b/app/service/verification_service.py index 053486a..778dc02 100644 --- a/app/service/verification_service.py +++ b/app/service/verification_service.py @@ -430,7 +430,7 @@ class LoginSessionService: await db.commit() await db.refresh(session) - logger.info(f"[Login Session] Created session for user {user_id} (new device: {is_new_device})") + logger.info(f"Created session for user {user_id} (new device: {is_new_device})") return session @classmethod @@ -562,7 +562,7 @@ class LoginSessionService: session.device_id = device_info.id if sessions: - logger.info(f"[Login Session] Marked {len(sessions)} session(s) as verified for user {user_id}") + logger.info(f"Marked {len(sessions)} session(s) as verified for user {user_id}") await LoginSessionService.clear_login_method(user_id, token_id, redis) await db.commit() @@ -570,7 +570,7 @@ class LoginSessionService: return len(sessions) > 0 except Exception as e: - logger.error(f"[Login Session] Exception during marking sessions as verified: {e}") + logger.error(f"Exception during marking sessions as verified: {e}") return False @staticmethod