From fce88272b5e80be2ac6c3a740032acacbb66a689 Mon Sep 17 00:00:00 2001 From: MingxuanGame Date: Fri, 3 Oct 2025 10:15:22 +0000 Subject: [PATCH] refactor(task): move schedulers and startup/shutdown task into `tasks` directory --- app/database/beatmapset.py | 17 +- app/helpers/geoip_helper.py | 14 +- app/log.py | 21 +- app/router/v2/cache.py | 4 +- app/scheduler/__init__.py | 7 - app/scheduler/cache_scheduler.py | 190 ------------- app/scheduler/user_cache_scheduler.py | 124 --------- app/service/__init__.py | 6 - app/service/beatmapset_update_service.py | 15 +- app/service/database_cleanup_service.py | 17 -- app/service/email_queue.py | 2 +- app/service/geoip_scheduler.py | 55 ---- app/service/init_geoip.py | 30 --- app/tasks/__init__.py | 28 ++ app/tasks/beatmapset_update.py | 21 ++ app/tasks/cache.py | 254 ++++++++++++++++++ .../calculate_all_user_rank.py | 15 ++ app/{service => tasks}/create_banchobot.py | 2 + app/{service => tasks}/daily_challenge.py | 3 +- app/tasks/database_cleanup.py | 21 ++ app/tasks/geoip.py | 57 ++++ app/{service => tasks}/load_achievements.py | 0 app/{service => tasks}/osu_rx_statistics.py | 13 + .../recalculate_banned_beatmap.py | 0 .../recalculate_failed_score.py | 0 main.py | 28 +- 26 files changed, 464 insertions(+), 480 deletions(-) delete mode 100644 app/scheduler/__init__.py delete mode 100644 app/scheduler/cache_scheduler.py delete mode 100644 app/scheduler/user_cache_scheduler.py delete mode 100644 app/service/geoip_scheduler.py delete mode 100644 app/service/init_geoip.py create mode 100644 app/tasks/__init__.py create mode 100644 app/tasks/beatmapset_update.py create mode 100644 app/tasks/cache.py rename app/{service => tasks}/calculate_all_user_rank.py (82%) rename app/{service => tasks}/create_banchobot.py (92%) rename app/{service => tasks}/daily_challenge.py (99%) create mode 100644 app/tasks/database_cleanup.py create mode 100644 app/tasks/geoip.py rename app/{service => tasks}/load_achievements.py (100%) rename app/{service => tasks}/osu_rx_statistics.py (78%) rename app/{service => tasks}/recalculate_banned_beatmap.py (100%) rename app/{service => tasks}/recalculate_failed_score.py (100%) diff --git a/app/database/beatmapset.py b/app/database/beatmapset.py index fa9abf4..d2df25e 100644 --- a/app/database/beatmapset.py +++ b/app/database/beatmapset.py @@ -132,21 +132,20 @@ class Beatmapset(AsyncAttrs, BeatmapsetBase, table=True): @classmethod async def from_resp_no_save(cls, session: AsyncSession, resp: "BeatmapsetResp", from_: int = 0) -> "Beatmapset": d = resp.model_dump() - update = {} if resp.nominations: - update["nominations_required"] = resp.nominations.required - update["nominations_current"] = resp.nominations.current + d["nominations_required"] = resp.nominations.required + d["nominations_current"] = resp.nominations.current if resp.hype: - update["hype_current"] = resp.hype.current - update["hype_required"] = resp.hype.required + d["hype_current"] = resp.hype.current + d["hype_required"] = resp.hype.required if resp.genre_id: - update["beatmap_genre"] = Genre(resp.genre_id) + d["beatmap_genre"] = Genre(resp.genre_id) elif resp.genre: - update["beatmap_genre"] = Genre(resp.genre.id) + d["beatmap_genre"] = Genre(resp.genre.id) if resp.language_id: - update["beatmap_language"] = Language(resp.language_id) + d["beatmap_language"] = Language(resp.language_id) elif resp.language: - update["beatmap_language"] = Language(resp.language.id) + d["beatmap_language"] = Language(resp.language.id) beatmapset = Beatmapset.model_validate( { **d, diff --git a/app/helpers/geoip_helper.py b/app/helpers/geoip_helper.py index 5be89c6..c0b822e 100644 --- a/app/helpers/geoip_helper.py +++ b/app/helpers/geoip_helper.py @@ -116,22 +116,20 @@ class GeoIPHelper: if age_days >= self.max_age_days: need = True logger.info( - f"[GeoIP] {eid} database is {age_days:.1f} days old " + f"{eid} database is {age_days:.1f} days old " f"(max: {self.max_age_days}), will download new version" ) else: - logger.info( - f"[GeoIP] {eid} database is {age_days:.1f} days old, still fresh (max: {self.max_age_days})" - ) + logger.info(f"{eid} database is {age_days:.1f} days old, still fresh (max: {self.max_age_days})") else: - logger.info(f"[GeoIP] {eid} database not found, will download") + logger.info(f"{eid} database not found, will download") if need: - logger.info(f"[GeoIP] Downloading {eid} database...") + logger.info(f"Downloading {eid} database...") path = self._download_and_extract(eid) - logger.info(f"[GeoIP] {eid} database downloaded successfully") + logger.info(f"{eid} database downloaded successfully") else: - logger.info(f"[GeoIP] Using existing {eid} database") + logger.info(f"Using existing {eid} database") old = self._readers.get(ed) if old: diff --git a/app/log.py b/app/log.py index 57b8eb0..6a8c478 100644 --- a/app/log.py +++ b/app/log.py @@ -9,6 +9,7 @@ from types import FunctionType from typing import TYPE_CHECKING from app.config import settings +from app.utils import snake_to_pascal import loguru @@ -108,7 +109,7 @@ class InterceptHandler(logging.Handler): return message -def get_caller_class_name(module_prefix: str = ""): +def get_caller_class_name(module_prefix: str = "", just_last_part: bool = True) -> str | None: """获取调用类名/模块名,仅对指定模块前缀生效""" stack = inspect.stack() for frame_info in stack[2:]: @@ -134,6 +135,8 @@ def get_caller_class_name(module_prefix: str = ""): return cls.__name__ # 如果没找到类,返回模块名 + if just_last_part: + return module.rsplit(".", 1)[-1] return module return None @@ -146,6 +149,14 @@ def fetcher_logger(name: str) -> Logger: return logger.bind(fetcher=name) +def task_logger(name: str) -> Logger: + return logger.bind(task=name) + + +def system_logger(name: str) -> Logger: + return logger.bind(system=name) + + def dynamic_format(record): prefix = "" @@ -161,6 +172,13 @@ def dynamic_format(record): if service: prefix = f"[{service}] " + task = record["extra"].get("task") + if not task: + task = get_caller_class_name("app.tasks") + if task: + task = snake_to_pascal(task) + prefix = f"[{task}] " + return f"{{time:YYYY-MM-DD HH:mm:ss}} [{{level}}] | {prefix}{{message}}\n" @@ -197,3 +215,4 @@ for logger_name in uvicorn_loggers: uvicorn_logger.propagate = False logging.getLogger("httpx").setLevel("WARNING") +logging.getLogger("apscheduler").setLevel("WARNING") diff --git a/app/router/v2/cache.py b/app/router/v2/cache.py index fe610a6..0b1a396 100644 --- a/app/router/v2/cache.py +++ b/app/router/v2/cache.py @@ -133,9 +133,7 @@ async def warmup_cache( return {"message": f"Warmed up cache for {len(request.user_ids)} users"} else: # 预热活跃用户 - from app.scheduler.user_cache_scheduler import ( - schedule_user_cache_preload_task, - ) + from app.tasks.cache import schedule_user_cache_preload_task await schedule_user_cache_preload_task() return {"message": f"Warmed up cache for top {request.limit} active users"} diff --git a/app/scheduler/__init__.py b/app/scheduler/__init__.py deleted file mode 100644 index d6e4f7c..0000000 --- a/app/scheduler/__init__.py +++ /dev/null @@ -1,7 +0,0 @@ -"""缓存调度器模块""" - -from __future__ import annotations - -from .cache_scheduler import start_cache_scheduler, stop_cache_scheduler - -__all__ = ["start_cache_scheduler", "stop_cache_scheduler"] diff --git a/app/scheduler/cache_scheduler.py b/app/scheduler/cache_scheduler.py deleted file mode 100644 index 9a36ddb..0000000 --- a/app/scheduler/cache_scheduler.py +++ /dev/null @@ -1,190 +0,0 @@ -from __future__ import annotations - -import asyncio - -from app.config import settings -from app.dependencies.database import get_redis -from app.dependencies.fetcher import get_fetcher -from app.log import logger -from app.scheduler.user_cache_scheduler import ( - schedule_user_cache_cleanup_task, - schedule_user_cache_preload_task, - schedule_user_cache_warmup_task, -) - - -class CacheScheduler: - """缓存调度器 - 统一管理各种缓存任务""" - - def __init__(self): - self.running = False - self.task = None - - async def start(self): - """启动调度器""" - if self.running: - return - - self.running = True - self.task = asyncio.create_task(self._run_scheduler()) - logger.info("CacheScheduler started") - - async def stop(self): - """停止调度器""" - self.running = False - if self.task: - self.task.cancel() - try: - await self.task - except asyncio.CancelledError: - pass - logger.info("CacheScheduler stopped") - - async def _run_scheduler(self): - """运行调度器主循环""" - # 启动时立即执行一次预热 - await self._warmup_cache() - - # 启动时执行一次排行榜缓存刷新 - await self._refresh_ranking_cache() - - # 启动时执行一次用户缓存预热 - await self._warmup_user_cache() - - beatmap_cache_counter = 0 - ranking_cache_counter = 0 - user_cache_counter = 0 - user_cleanup_counter = 0 - - # 从配置文件获取间隔设置 - check_interval = 5 * 60 # 5分钟检查间隔 - beatmap_cache_interval = 30 * 60 # 30分钟beatmap缓存间隔 - ranking_cache_interval = settings.ranking_cache_refresh_interval_minutes * 60 # 从配置读取 - user_cache_interval = 15 * 60 # 15分钟用户缓存预加载间隔 - user_cleanup_interval = 60 * 60 # 60分钟用户缓存清理间隔 - - beatmap_cache_cycles = beatmap_cache_interval // check_interval - ranking_cache_cycles = ranking_cache_interval // check_interval - user_cache_cycles = user_cache_interval // check_interval - user_cleanup_cycles = user_cleanup_interval // check_interval - - while self.running: - try: - # 每5分钟检查一次 - await asyncio.sleep(check_interval) - - if not self.running: - break - - beatmap_cache_counter += 1 - ranking_cache_counter += 1 - user_cache_counter += 1 - user_cleanup_counter += 1 - - # beatmap缓存预热 - if beatmap_cache_counter >= beatmap_cache_cycles: - await self._warmup_cache() - beatmap_cache_counter = 0 - - # 排行榜缓存刷新 - if ranking_cache_counter >= ranking_cache_cycles: - await self._refresh_ranking_cache() - ranking_cache_counter = 0 - - # 用户缓存预加载 - if user_cache_counter >= user_cache_cycles: - await self._preload_user_cache() - user_cache_counter = 0 - - # 用户缓存清理 - if user_cleanup_counter >= user_cleanup_cycles: - await self._cleanup_user_cache() - user_cleanup_counter = 0 - - except asyncio.CancelledError: - break - except Exception as e: - logger.error(f"Cache scheduler error: {e}") - await asyncio.sleep(60) # 出错后等待1分钟再继续 - - async def _warmup_cache(self): - """执行缓存预热""" - try: - logger.info("Starting beatmap cache warmup...") - - fetcher = await get_fetcher() - redis = get_redis() - - # 预热主页缓存 - await fetcher.warmup_homepage_cache(redis) - - logger.info("Beatmap cache warmup completed successfully") - - except Exception as e: - logger.error(f"Beatmap cache warmup failed: {e}") - - async def _refresh_ranking_cache(self): - """刷新排行榜缓存""" - try: - logger.info("Starting ranking cache refresh...") - - redis = get_redis() - - # 导入排行榜缓存服务 - # 使用独立的数据库会话 - from app.dependencies.database import with_db - from app.service.ranking_cache_service import ( - schedule_ranking_refresh_task, - ) - - async with with_db() as session: - await schedule_ranking_refresh_task(session, redis) - - logger.info("Ranking cache refresh completed successfully") - - except Exception as e: - logger.error(f"Ranking cache refresh failed: {e}") - - async def _warmup_user_cache(self): - """用户缓存预热""" - try: - await schedule_user_cache_warmup_task() - except Exception as e: - logger.error(f"User cache warmup failed: {e}") - - async def _preload_user_cache(self): - """用户缓存预加载""" - try: - await schedule_user_cache_preload_task() - except Exception as e: - logger.error(f"User cache preload failed: {e}") - - async def _cleanup_user_cache(self): - """用户缓存清理""" - try: - await schedule_user_cache_cleanup_task() - except Exception as e: - logger.error(f"User cache cleanup failed: {e}") - - -# Beatmap缓存调度器(保持向后兼容) -class BeatmapsetCacheScheduler(CacheScheduler): - """谱面集缓存调度器 - 为了向后兼容""" - - pass - - -# 全局调度器实例 -cache_scheduler = CacheScheduler() -# 保持向后兼容的别名 -beatmapset_cache_scheduler = BeatmapsetCacheScheduler() - - -async def start_cache_scheduler(): - """启动缓存调度器""" - await cache_scheduler.start() - - -async def stop_cache_scheduler(): - """停止缓存调度器""" - await cache_scheduler.stop() diff --git a/app/scheduler/user_cache_scheduler.py b/app/scheduler/user_cache_scheduler.py deleted file mode 100644 index 0589daf..0000000 --- a/app/scheduler/user_cache_scheduler.py +++ /dev/null @@ -1,124 +0,0 @@ -""" -用户缓存预热任务调度器 -""" - -from __future__ import annotations - -import asyncio -from datetime import timedelta - -from app.config import settings -from app.database.score import Score -from app.dependencies.database import get_redis -from app.log import logger -from app.service.user_cache_service import get_user_cache_service -from app.utils import utcnow - -from sqlmodel import col, func, select - - -async def schedule_user_cache_preload_task(): - """定时用户缓存预加载任务""" - # 默认启用用户缓存预加载,除非明确禁用 - enable_user_cache_preload = getattr(settings, "enable_user_cache_preload", True) - if not enable_user_cache_preload: - return - - try: - logger.info("Starting user cache preload task...") - - redis = get_redis() - cache_service = get_user_cache_service(redis) - - # 使用独立的数据库会话 - from app.dependencies.database import with_db - - async with with_db() as session: - # 获取最近24小时内活跃的用户(提交过成绩的用户) - recent_time = utcnow() - timedelta(hours=24) - - score_count = func.count().label("score_count") - active_user_ids = ( - await session.exec( - select(Score.user_id, score_count) - .where(col(Score.ended_at) >= recent_time) - .group_by(col(Score.user_id)) - .order_by(score_count.desc()) # 使用标签对象而不是字符串 - .limit(settings.user_cache_max_preload_users) # 使用配置中的限制 - ) - ).all() - - if active_user_ids: - user_ids = [row[0] for row in active_user_ids] - await cache_service.preload_user_cache(session, user_ids) - logger.info(f"Preloaded cache for {len(user_ids)} active users") - else: - logger.info("No active users found for cache preload") - - logger.info("User cache preload task completed successfully") - - except Exception as e: - logger.error(f"User cache preload task failed: {e}") - - -async def schedule_user_cache_warmup_task(): - """定时用户缓存预热任务 - 预加载排行榜前100用户""" - try: - logger.info("Starting user cache warmup task...") - - redis = get_redis() - cache_service = get_user_cache_service(redis) - - # 使用独立的数据库会话 - from app.dependencies.database import with_db - - async with with_db() as session: - # 获取全球排行榜前100的用户 - from app.database.statistics import UserStatistics - from app.models.score import GameMode - - for mode in GameMode: - try: - top_users = ( - await session.exec( - select(UserStatistics.user_id) - .where(UserStatistics.mode == mode) - .order_by(col(UserStatistics.pp).desc()) - .limit(100) - ) - ).all() - - if top_users: - user_ids = list(top_users) - await cache_service.preload_user_cache(session, user_ids) - logger.info(f"Warmed cache for top 100 users in {mode}") - - # 避免过载,稍微延迟 - await asyncio.sleep(1) - - except Exception as e: - logger.error(f"Failed to warm cache for {mode}: {e}") - continue - - logger.info("User cache warmup task completed successfully") - - except Exception as e: - logger.error(f"User cache warmup task failed: {e}") - - -async def schedule_user_cache_cleanup_task(): - """定时用户缓存清理任务""" - try: - logger.info("Starting user cache cleanup task...") - - redis = get_redis() - - # 清理过期的用户缓存(Redis会自动处理TTL,这里主要记录统计信息) - cache_service = get_user_cache_service(redis) - stats = await cache_service.get_cache_stats() - - logger.info(f"User cache stats: {stats}") - logger.info("User cache cleanup task completed successfully") - - except Exception as e: - logger.error(f"User cache cleanup task failed: {e}") diff --git a/app/service/__init__.py b/app/service/__init__.py index ced3b75..8ddddd9 100644 --- a/app/service/__init__.py +++ b/app/service/__init__.py @@ -1,14 +1,8 @@ from __future__ import annotations -from .daily_challenge import create_daily_challenge_room -from .recalculate_banned_beatmap import recalculate_banned_beatmap -from .recalculate_failed_score import recalculate_failed_score from .room import create_playlist_room, create_playlist_room_from_api __all__ = [ - "create_daily_challenge_room", "create_playlist_room", "create_playlist_room_from_api", - "recalculate_banned_beatmap", - "recalculate_failed_score", ] diff --git a/app/service/beatmapset_update_service.py b/app/service/beatmapset_update_service.py index 16199d5..a03f91b 100644 --- a/app/service/beatmapset_update_service.py +++ b/app/service/beatmapset_update_service.py @@ -1,6 +1,6 @@ from __future__ import annotations -from datetime import datetime, timedelta +from datetime import timedelta from enum import Enum import math import random @@ -12,7 +12,6 @@ from app.database.beatmap_sync import BeatmapSync, SavedBeatmapMeta from app.database.beatmapset import Beatmapset, BeatmapsetResp from app.database.score import Score from app.dependencies.database import with_db -from app.dependencies.scheduler import get_scheduler from app.dependencies.storage import get_storage_service from app.log import logger from app.models.beatmap import BeatmapRankStatus @@ -347,15 +346,3 @@ def init_beatmapset_update_service(fetcher: "Fetcher") -> BeatmapsetUpdateServic def get_beatmapset_update_service() -> BeatmapsetUpdateService: assert service is not None, "BeatmapsetUpdateService is not initialized" return service - - -@get_scheduler().scheduled_job( - "interval", - id="update_beatmaps", - minutes=SCHEDULER_INTERVAL_MINUTES, - next_run_time=datetime.now() + timedelta(minutes=1), -) -async def beatmapset_update_job(): - if service is not None: - bg_tasks.add_task(service.add_missing_beatmapsets) - await service._update_beatmaps() diff --git a/app/service/database_cleanup_service.py b/app/service/database_cleanup_service.py index 3acac4a..c8ffd0f 100644 --- a/app/service/database_cleanup_service.py +++ b/app/service/database_cleanup_service.py @@ -8,8 +8,6 @@ from datetime import timedelta from app.database.auth import OAuthToken from app.database.verification import EmailVerification, LoginSession, TrustedDevice -from app.dependencies.database import with_db -from app.dependencies.scheduler import get_scheduler from app.log import logger from app.utils import utcnow @@ -434,18 +432,3 @@ class DatabaseCleanupService: "outdated_trusted_devices": 0, "total_cleanable": 0, } - - -@get_scheduler().scheduled_job( - "interval", - id="cleanup_database", - hours=1, -) -async def scheduled_cleanup_job(): - async with with_db() as session: - logger.debug("Starting database cleanup...") - results = await DatabaseCleanupService.run_full_cleanup(session) - total = sum(results.values()) - if total > 0: - logger.debug(f"Cleanup completed, total records cleaned: {total}") - return results diff --git a/app/service/email_queue.py b/app/service/email_queue.py index 9c3ec62..7a07291 100644 --- a/app/service/email_queue.py +++ b/app/service/email_queue.py @@ -17,7 +17,7 @@ import uuid from app.config import settings from app.log import logger -from app.utils import bg_tasks # 添加同步Redis导入 +from app.utils import bg_tasks import redis as sync_redis diff --git a/app/service/geoip_scheduler.py b/app/service/geoip_scheduler.py deleted file mode 100644 index 1169496..0000000 --- a/app/service/geoip_scheduler.py +++ /dev/null @@ -1,55 +0,0 @@ -""" -[GeoIP] Scheduled Update Service -Periodically update the MaxMind GeoIP database -""" - -from __future__ import annotations - -import asyncio - -from app.config import settings -from app.dependencies.geoip import get_geoip_helper -from app.dependencies.scheduler import get_scheduler -from app.log import logger - - -async def update_geoip_database(): - """ - Asynchronous task to update the GeoIP database - """ - try: - logger.info("[GeoIP] Starting scheduled GeoIP database update...") - geoip = get_geoip_helper() - - # Run the synchronous update method in a background thread - loop = asyncio.get_event_loop() - await loop.run_in_executor(None, lambda: geoip.update(force=False)) - - logger.info("[GeoIP] Scheduled GeoIP database update completed successfully") - except Exception as e: - logger.error(f"[GeoIP] Scheduled GeoIP database update failed: {e}") - - -def schedule_geoip_updates(): - """ - Schedule the GeoIP database update task - """ - scheduler = get_scheduler() - - # Use settings to configure the update time: update once a week - scheduler.add_job( - update_geoip_database, - "cron", - day_of_week=settings.geoip_update_day, - hour=settings.geoip_update_hour, - minute=0, - id="geoip_weekly_update", - name="Weekly GeoIP database update", - replace_existing=True, - ) - - logger.info( - f"[GeoIP] Scheduled update task registered: " - f"every week on day {settings.geoip_update_day} " - f"at {settings.geoip_update_hour}:00" - ) diff --git a/app/service/init_geoip.py b/app/service/init_geoip.py deleted file mode 100644 index 95a2edf..0000000 --- a/app/service/init_geoip.py +++ /dev/null @@ -1,30 +0,0 @@ -""" -[GeoIP] Initialization Service -Initialize the GeoIP database when the application starts -""" - -from __future__ import annotations - -import asyncio - -from app.dependencies.geoip import get_geoip_helper -from app.log import logger - - -async def init_geoip(): - """ - Asynchronously initialize the GeoIP database - """ - try: - geoip = get_geoip_helper() - logger.info("[GeoIP] Initializing GeoIP database...") - - # Run the synchronous update method in a background thread - # force=False means only download if files don't exist or are expired - loop = asyncio.get_event_loop() - await loop.run_in_executor(None, lambda: geoip.update(force=False)) - - logger.info("[GeoIP] GeoIP database initialization completed") - except Exception as e: - logger.error(f"[GeoIP] GeoIP database initialization failed: {e}") - # Do not raise an exception to avoid blocking application startup diff --git a/app/tasks/__init__.py b/app/tasks/__init__.py new file mode 100644 index 0000000..6b3332a --- /dev/null +++ b/app/tasks/__init__.py @@ -0,0 +1,28 @@ +# ruff: noqa: F401 +from __future__ import annotations + +from . import ( + beatmapset_update, + database_cleanup, + recalculate_banned_beatmap, + recalculate_failed_score, +) +from .cache import start_cache_tasks, stop_cache_tasks +from .calculate_all_user_rank import calculate_user_rank +from .create_banchobot import create_banchobot +from .daily_challenge import daily_challenge_job, process_daily_challenge_top +from .geoip import init_geoip +from .load_achievements import load_achievements +from .osu_rx_statistics import create_rx_statistics + +__all__ = [ + "calculate_user_rank", + "create_banchobot", + "create_rx_statistics", + "daily_challenge_job", + "init_geoip", + "load_achievements", + "process_daily_challenge_top", + "start_cache_tasks", + "stop_cache_tasks", +] diff --git a/app/tasks/beatmapset_update.py b/app/tasks/beatmapset_update.py new file mode 100644 index 0000000..4e1492e --- /dev/null +++ b/app/tasks/beatmapset_update.py @@ -0,0 +1,21 @@ +from __future__ import annotations + +from datetime import datetime, timedelta + +from app.dependencies.scheduler import get_scheduler +from app.service.beatmapset_update_service import service +from app.utils import bg_tasks + +SCHEDULER_INTERVAL_MINUTES = 2 + + +@get_scheduler().scheduled_job( + "interval", + id="update_beatmaps", + minutes=SCHEDULER_INTERVAL_MINUTES, + next_run_time=datetime.now() + timedelta(minutes=1), +) +async def beatmapset_update_job(): + if service is not None: + bg_tasks.add_task(service.add_missing_beatmapsets) + await service._update_beatmaps() diff --git a/app/tasks/cache.py b/app/tasks/cache.py new file mode 100644 index 0000000..4a684f6 --- /dev/null +++ b/app/tasks/cache.py @@ -0,0 +1,254 @@ +"""缓存相关的 APScheduler 任务入口。""" + +from __future__ import annotations + +import asyncio +from datetime import UTC, timedelta +from typing import Final + +from app.config import settings +from app.database.score import Score +from app.dependencies.database import get_redis +from app.dependencies.fetcher import get_fetcher +from app.dependencies.scheduler import get_scheduler +from app.log import logger +from app.service.ranking_cache_service import schedule_ranking_refresh_task +from app.service.user_cache_service import get_user_cache_service +from app.utils import utcnow + +from apscheduler.jobstores.base import JobLookupError +from apscheduler.triggers.interval import IntervalTrigger +from sqlmodel import col, func, select + +CACHE_JOB_IDS: Final[dict[str, str]] = { + "beatmap_warmup": "cache:beatmap:warmup", + "ranking_refresh": "cache:ranking:refresh", + "user_preload": "cache:user:preload", + "user_cleanup": "cache:user:cleanup", +} + + +async def warmup_cache() -> None: + """执行缓存预热""" + try: + logger.info("Starting beatmap cache warmup...") + + fetcher = await get_fetcher() + redis = get_redis() + + await fetcher.warmup_homepage_cache(redis) + + logger.info("Beatmap cache warmup completed successfully") + + except Exception as e: + logger.error("Beatmap cache warmup failed: %s", e) + + +async def refresh_ranking_cache() -> None: + """刷新排行榜缓存""" + try: + logger.info("Starting ranking cache refresh...") + + redis = get_redis() + + from app.dependencies.database import with_db + + async with with_db() as session: + await schedule_ranking_refresh_task(session, redis) + + logger.info("Ranking cache refresh completed successfully") + + except Exception as e: + logger.error("Ranking cache refresh failed: %s", e) + + +async def schedule_user_cache_preload_task() -> None: + """定时用户缓存预加载任务""" + enable_user_cache_preload = getattr(settings, "enable_user_cache_preload", True) + if not enable_user_cache_preload: + return + + try: + logger.info("Starting user cache preload task...") + + redis = get_redis() + cache_service = get_user_cache_service(redis) + + from app.dependencies.database import with_db + + async with with_db() as session: + recent_time = utcnow() - timedelta(hours=24) + + score_count = func.count().label("score_count") + active_user_ids = ( + await session.exec( + select(Score.user_id, score_count) + .where(col(Score.ended_at) >= recent_time) + .group_by(col(Score.user_id)) + .order_by(score_count.desc()) + .limit(settings.user_cache_max_preload_users) + ) + ).all() + + if active_user_ids: + user_ids = [row[0] for row in active_user_ids] + await cache_service.preload_user_cache(session, user_ids) + logger.info("Preloaded cache for %s active users", len(user_ids)) + else: + logger.info("No active users found for cache preload") + + logger.info("User cache preload task completed successfully") + + except Exception as e: + logger.error("User cache preload task failed: %s", e) + + +async def schedule_user_cache_warmup_task() -> None: + """定时用户缓存预热任务 - 预加载排行榜前100用户""" + try: + logger.info("Starting user cache warmup task...") + + redis = get_redis() + cache_service = get_user_cache_service(redis) + + from app.dependencies.database import with_db + + async with with_db() as session: + from app.database.statistics import UserStatistics + from app.models.score import GameMode + + for mode in GameMode: + try: + top_users = ( + await session.exec( + select(UserStatistics.user_id) + .where(UserStatistics.mode == mode) + .order_by(col(UserStatistics.pp).desc()) + .limit(100) + ) + ).all() + + if top_users: + user_ids = list(top_users) + await cache_service.preload_user_cache(session, user_ids) + logger.info("Warmed cache for top 100 users in %s", mode) + + await asyncio.sleep(1) + + except Exception as e: + logger.error("Failed to warm cache for %s: %s", mode, e) + continue + + logger.info("User cache warmup task completed successfully") + + except Exception as e: + logger.error("User cache warmup task failed: %s", e) + + +async def schedule_user_cache_cleanup_task() -> None: + """定时用户缓存清理任务""" + try: + logger.info("Starting user cache cleanup task...") + + redis = get_redis() + + cache_service = get_user_cache_service(redis) + stats = await cache_service.get_cache_stats() + + logger.info("User cache stats: %s", stats) + logger.info("User cache cleanup task completed successfully") + + except Exception as e: + logger.error("User cache cleanup task failed: %s", e) + + +async def warmup_user_cache() -> None: + """用户缓存预热""" + try: + await schedule_user_cache_warmup_task() + except Exception as e: + logger.error("User cache warmup failed: %s", e) + + +async def preload_user_cache() -> None: + """用户缓存预加载""" + try: + await schedule_user_cache_preload_task() + except Exception as e: + logger.error("User cache preload failed: %s", e) + + +async def cleanup_user_cache() -> None: + """用户缓存清理""" + try: + await schedule_user_cache_cleanup_task() + except Exception as e: + logger.error("User cache cleanup failed: %s", e) + + +def register_cache_jobs() -> None: + """注册缓存相关 APScheduler 任务""" + scheduler = get_scheduler() + + scheduler.add_job( + warmup_cache, + trigger=IntervalTrigger(minutes=30, timezone=UTC), + id=CACHE_JOB_IDS["beatmap_warmup"], + replace_existing=True, + coalesce=True, + max_instances=1, + misfire_grace_time=300, + ) + + scheduler.add_job( + refresh_ranking_cache, + trigger=IntervalTrigger( + minutes=settings.ranking_cache_refresh_interval_minutes, + timezone=UTC, + ), + id=CACHE_JOB_IDS["ranking_refresh"], + replace_existing=True, + coalesce=True, + max_instances=1, + misfire_grace_time=300, + ) + + scheduler.add_job( + preload_user_cache, + trigger=IntervalTrigger(minutes=15, timezone=UTC), + id=CACHE_JOB_IDS["user_preload"], + replace_existing=True, + coalesce=True, + max_instances=1, + misfire_grace_time=300, + ) + + scheduler.add_job( + cleanup_user_cache, + trigger=IntervalTrigger(hours=1, timezone=UTC), + id=CACHE_JOB_IDS["user_cleanup"], + replace_existing=True, + coalesce=True, + max_instances=1, + misfire_grace_time=300, + ) + + logger.info("Registered cache APScheduler jobs") + + +async def start_cache_tasks() -> None: + """注册 APScheduler 任务并执行启动时任务""" + register_cache_jobs() + logger.info("Cache APScheduler jobs registered; running initial tasks") + + +async def stop_cache_tasks() -> None: + """移除 APScheduler 任务""" + scheduler = get_scheduler() + for job_id in CACHE_JOB_IDS.values(): + try: + scheduler.remove_job(job_id) + except JobLookupError: + continue + + logger.info("Cache APScheduler jobs removed") diff --git a/app/service/calculate_all_user_rank.py b/app/tasks/calculate_all_user_rank.py similarity index 82% rename from app/service/calculate_all_user_rank.py rename to app/tasks/calculate_all_user_rank.py index 1d1395a..f742d25 100644 --- a/app/service/calculate_all_user_rank.py +++ b/app/tasks/calculate_all_user_rank.py @@ -6,6 +6,7 @@ from app.database import RankHistory, UserStatistics from app.database.rank_history import RankTop from app.dependencies.database import with_db from app.dependencies.scheduler import get_scheduler +from app.log import logger from app.models.score import GameMode from app.utils import utcnow @@ -16,8 +17,10 @@ from sqlmodel import col, exists, select, update async def calculate_user_rank(is_today: bool = False): today = utcnow().date() target_date = today if is_today else today - timedelta(days=1) + logger.info("Starting user rank calculation for {}", target_date) async with with_db() as session: for gamemode in GameMode: + logger.info("Calculating ranks for {} on {}", gamemode.name, target_date) users = await session.exec( select(UserStatistics) .where( @@ -31,6 +34,7 @@ async def calculate_user_rank(is_today: bool = False): ) ) rank = 1 + processed_users = 0 for user in users: is_exist = ( await session.exec( @@ -82,4 +86,15 @@ async def calculate_user_rank(is_today: bool = False): rank_top.date = today rank += 1 + processed_users += 1 await session.commit() + if processed_users > 0: + logger.info( + "Updated ranks for {} on {} ({} users)", + gamemode.name, + target_date, + processed_users, + ) + else: + logger.info("No users found for {} on {}", gamemode.name, target_date) + logger.success("User rank calculation completed for {}", target_date) diff --git a/app/service/create_banchobot.py b/app/tasks/create_banchobot.py similarity index 92% rename from app/service/create_banchobot.py rename to app/tasks/create_banchobot.py index 16605c5..6148ff1 100644 --- a/app/service/create_banchobot.py +++ b/app/tasks/create_banchobot.py @@ -4,6 +4,7 @@ from app.const import BANCHOBOT_ID from app.database.statistics import UserStatistics from app.database.user import User from app.dependencies.database import with_db +from app.log import logger from app.models.score import GameMode from sqlmodel import exists, select @@ -27,3 +28,4 @@ async def create_banchobot(): statistics = UserStatistics(user_id=BANCHOBOT_ID, mode=GameMode.OSU) session.add(statistics) await session.commit() + logger.success("BanchoBot user created") diff --git a/app/service/daily_challenge.py b/app/tasks/daily_challenge.py similarity index 99% rename from app/service/daily_challenge.py rename to app/tasks/daily_challenge.py index c0f5aac..e8a7fa3 100644 --- a/app/service/daily_challenge.py +++ b/app/tasks/daily_challenge.py @@ -17,10 +17,9 @@ from app.log import logger from app.models.metadata_hub import DailyChallengeInfo from app.models.mods import APIMod, get_available_mods from app.models.room import RoomCategory +from app.service.room import create_playlist_room from app.utils import are_same_weeks, utcnow -from .room import create_playlist_room - from sqlmodel import col, select diff --git a/app/tasks/database_cleanup.py b/app/tasks/database_cleanup.py new file mode 100644 index 0000000..264e5c1 --- /dev/null +++ b/app/tasks/database_cleanup.py @@ -0,0 +1,21 @@ +from __future__ import annotations + +from app.dependencies.database import with_db +from app.dependencies.scheduler import get_scheduler +from app.log import logger +from app.service.database_cleanup_service import DatabaseCleanupService + + +@get_scheduler().scheduled_job( + "interval", + id="cleanup_database", + hours=1, +) +async def scheduled_cleanup_job(): + async with with_db() as session: + logger.info("Starting database cleanup...") + results = await DatabaseCleanupService.run_full_cleanup(session) + total = sum(results.values()) + if total > 0: + logger.success(f"Cleanup completed, total records cleaned: {total}") + return results diff --git a/app/tasks/geoip.py b/app/tasks/geoip.py new file mode 100644 index 0000000..0d22ed8 --- /dev/null +++ b/app/tasks/geoip.py @@ -0,0 +1,57 @@ +""" +Scheduled Update Service +Periodically update the MaxMind GeoIP database +""" + +from __future__ import annotations + +import asyncio + +from app.config import settings +from app.dependencies.geoip import get_geoip_helper +from app.dependencies.scheduler import get_scheduler +from app.log import logger + + +@get_scheduler().scheduled_job( + "cron", + day_of_week=settings.geoip_update_day, + hour=settings.geoip_update_hour, + minute=0, + id="geoip_weekly_update", + name="Weekly GeoIP database update", +) +async def update_geoip_database(): + """ + Asynchronous task to update the GeoIP database + """ + try: + logger.info("Starting scheduled GeoIP database update...") + geoip = get_geoip_helper() + + # Run the synchronous update method in a background thread + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, lambda: geoip.update(force=False)) + + logger.info("Scheduled GeoIP database update completed successfully") + except Exception as e: + logger.error(f"Scheduled GeoIP database update failed: {e}") + + +async def init_geoip(): + """ + Asynchronously initialize the GeoIP database + """ + try: + geoip = get_geoip_helper() + logger.info("Initializing GeoIP database...") + + # Run the synchronous update method in a background thread + # force=False means only download if files don't exist or are expired + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, lambda: geoip.update(force=False)) + + logger.info("GeoIP database initialization completed") + except Exception as e: + logger.error(f"GeoIP database initialization failed: {e}") + # Do not raise an exception to avoid blocking application startup diff --git a/app/service/load_achievements.py b/app/tasks/load_achievements.py similarity index 100% rename from app/service/load_achievements.py rename to app/tasks/load_achievements.py diff --git a/app/service/osu_rx_statistics.py b/app/tasks/osu_rx_statistics.py similarity index 78% rename from app/service/osu_rx_statistics.py rename to app/tasks/osu_rx_statistics.py index ed82189..732d727 100644 --- a/app/service/osu_rx_statistics.py +++ b/app/tasks/osu_rx_statistics.py @@ -5,6 +5,7 @@ from app.const import BANCHOBOT_ID from app.database.statistics import UserStatistics from app.database.user import User from app.dependencies.database import with_db +from app.log import logger from app.models.score import GameMode from sqlalchemy import exists @@ -14,6 +15,10 @@ from sqlmodel import select async def create_rx_statistics(): async with with_db() as session: users = (await session.exec(select(User.id))).all() + total_users = len(users) + logger.info("Ensuring RX/AP statistics exist for %s users", total_users) + rx_created = 0 + ap_created = 0 for i in users: if i == BANCHOBOT_ID: continue @@ -35,6 +40,7 @@ async def create_rx_statistics(): if not is_exist: statistics_rx = UserStatistics(mode=mode, user_id=i) session.add(statistics_rx) + rx_created += 1 if settings.enable_ap: is_exist = ( await session.exec( @@ -47,4 +53,11 @@ async def create_rx_statistics(): if not is_exist: statistics_ap = UserStatistics(mode=GameMode.OSUAP, user_id=i) session.add(statistics_ap) + ap_created += 1 await session.commit() + if rx_created or ap_created: + logger.success( + "Created %s RX statistics rows and %s AP statistics rows during backfill", + rx_created, + ap_created, + ) diff --git a/app/service/recalculate_banned_beatmap.py b/app/tasks/recalculate_banned_beatmap.py similarity index 100% rename from app/service/recalculate_banned_beatmap.py rename to app/tasks/recalculate_banned_beatmap.py diff --git a/app/service/recalculate_failed_score.py b/app/tasks/recalculate_failed_score.py similarity index 100% rename from app/service/recalculate_failed_score.py rename to app/tasks/recalculate_failed_score.py diff --git a/main.py b/main.py index d5c7f29..d74acfa 100644 --- a/main.py +++ b/main.py @@ -24,18 +24,21 @@ from app.router import ( ) from app.router.redirect import redirect_router from app.router.v1 import api_v1_public_router -from app.scheduler.cache_scheduler import start_cache_scheduler, stop_cache_scheduler from app.service.beatmap_download_service import download_service from app.service.beatmapset_update_service import init_beatmapset_update_service -from app.service.calculate_all_user_rank import calculate_user_rank -from app.service.create_banchobot import create_banchobot -from app.service.daily_challenge import daily_challenge_job, process_daily_challenge_top from app.service.email_queue import start_email_processor, stop_email_processor -from app.service.geoip_scheduler import schedule_geoip_updates -from app.service.init_geoip import init_geoip -from app.service.load_achievements import load_achievements -from app.service.osu_rx_statistics import create_rx_statistics from app.service.redis_message_system import redis_message_system +from app.tasks import ( + calculate_user_rank, + create_banchobot, + create_rx_statistics, + daily_challenge_job, + init_geoip, + load_achievements, + process_daily_challenge_top, + start_cache_tasks, + stop_cache_tasks, +) from app.utils import bg_tasks, utcnow from fastapi import FastAPI, HTTPException, Request @@ -56,17 +59,16 @@ async def lifespan(app: FastAPI): await init_geoip() # 初始化 GeoIP 数据库 await create_rx_statistics() await calculate_user_rank(True) - start_scheduler() - schedule_geoip_updates() # 调度 GeoIP 定时更新任务 await daily_challenge_job() await process_daily_challenge_top() await create_banchobot() await start_email_processor() # 启动邮件队列处理器 await download_service.start_health_check() # 启动下载服务健康检查 - await start_cache_scheduler() # 启动缓存调度器 + await start_cache_tasks() # 启动缓存调度器 init_beatmapset_update_service(fetcher) # 初始化谱面集更新服务 redis_message_system.start() # 启动 Redis 消息系统 load_achievements() + start_scheduler() # 显示资源代理状态 if settings.enable_asset_proxy: @@ -75,9 +77,9 @@ async def lifespan(app: FastAPI): # on shutdown yield bg_tasks.stop() - stop_scheduler() redis_message_system.stop() # 停止 Redis 消息系统 - await stop_cache_scheduler() # 停止缓存调度器 + await stop_cache_tasks() # 停止缓存调度器 + stop_scheduler() await download_service.stop_health_check() # 停止下载服务健康检查 await stop_email_processor() # 停止邮件队列处理器 await engine.dispose()