diff --git a/app/database/beatmapset.py b/app/database/beatmapset.py
index fa9abf4..d2df25e 100644
--- a/app/database/beatmapset.py
+++ b/app/database/beatmapset.py
@@ -132,21 +132,20 @@ class Beatmapset(AsyncAttrs, BeatmapsetBase, table=True):
@classmethod
async def from_resp_no_save(cls, session: AsyncSession, resp: "BeatmapsetResp", from_: int = 0) -> "Beatmapset":
d = resp.model_dump()
- update = {}
if resp.nominations:
- update["nominations_required"] = resp.nominations.required
- update["nominations_current"] = resp.nominations.current
+ d["nominations_required"] = resp.nominations.required
+ d["nominations_current"] = resp.nominations.current
if resp.hype:
- update["hype_current"] = resp.hype.current
- update["hype_required"] = resp.hype.required
+ d["hype_current"] = resp.hype.current
+ d["hype_required"] = resp.hype.required
if resp.genre_id:
- update["beatmap_genre"] = Genre(resp.genre_id)
+ d["beatmap_genre"] = Genre(resp.genre_id)
elif resp.genre:
- update["beatmap_genre"] = Genre(resp.genre.id)
+ d["beatmap_genre"] = Genre(resp.genre.id)
if resp.language_id:
- update["beatmap_language"] = Language(resp.language_id)
+ d["beatmap_language"] = Language(resp.language_id)
elif resp.language:
- update["beatmap_language"] = Language(resp.language.id)
+ d["beatmap_language"] = Language(resp.language.id)
beatmapset = Beatmapset.model_validate(
{
**d,
diff --git a/app/helpers/geoip_helper.py b/app/helpers/geoip_helper.py
index 5be89c6..c0b822e 100644
--- a/app/helpers/geoip_helper.py
+++ b/app/helpers/geoip_helper.py
@@ -116,22 +116,20 @@ class GeoIPHelper:
if age_days >= self.max_age_days:
need = True
logger.info(
- f"[GeoIP] {eid} database is {age_days:.1f} days old "
+ f"{eid} database is {age_days:.1f} days old "
f"(max: {self.max_age_days}), will download new version"
)
else:
- logger.info(
- f"[GeoIP] {eid} database is {age_days:.1f} days old, still fresh (max: {self.max_age_days})"
- )
+ logger.info(f"{eid} database is {age_days:.1f} days old, still fresh (max: {self.max_age_days})")
else:
- logger.info(f"[GeoIP] {eid} database not found, will download")
+ logger.info(f"{eid} database not found, will download")
if need:
- logger.info(f"[GeoIP] Downloading {eid} database...")
+ logger.info(f"Downloading {eid} database...")
path = self._download_and_extract(eid)
- logger.info(f"[GeoIP] {eid} database downloaded successfully")
+ logger.info(f"{eid} database downloaded successfully")
else:
- logger.info(f"[GeoIP] Using existing {eid} database")
+ logger.info(f"Using existing {eid} database")
old = self._readers.get(ed)
if old:
diff --git a/app/log.py b/app/log.py
index 57b8eb0..6a8c478 100644
--- a/app/log.py
+++ b/app/log.py
@@ -9,6 +9,7 @@ from types import FunctionType
from typing import TYPE_CHECKING
from app.config import settings
+from app.utils import snake_to_pascal
import loguru
@@ -108,7 +109,7 @@ class InterceptHandler(logging.Handler):
return message
-def get_caller_class_name(module_prefix: str = ""):
+def get_caller_class_name(module_prefix: str = "", just_last_part: bool = True) -> str | None:
"""获取调用类名/模块名,仅对指定模块前缀生效"""
stack = inspect.stack()
for frame_info in stack[2:]:
@@ -134,6 +135,8 @@ def get_caller_class_name(module_prefix: str = ""):
return cls.__name__
# 如果没找到类,返回模块名
+ if just_last_part:
+ return module.rsplit(".", 1)[-1]
return module
return None
@@ -146,6 +149,14 @@ def fetcher_logger(name: str) -> Logger:
return logger.bind(fetcher=name)
+def task_logger(name: str) -> Logger:
+ return logger.bind(task=name)
+
+
+def system_logger(name: str) -> Logger:
+ return logger.bind(system=name)
+
+
def dynamic_format(record):
prefix = ""
@@ -161,6 +172,13 @@ def dynamic_format(record):
if service:
prefix = f"[{service}] "
+ task = record["extra"].get("task")
+ if not task:
+ task = get_caller_class_name("app.tasks")
+ if task:
+ task = snake_to_pascal(task)
+ prefix = f"[{task}] "
+
return f"{{time:YYYY-MM-DD HH:mm:ss}} [{{level}}] | {prefix}{{message}}\n"
@@ -197,3 +215,4 @@ for logger_name in uvicorn_loggers:
uvicorn_logger.propagate = False
logging.getLogger("httpx").setLevel("WARNING")
+logging.getLogger("apscheduler").setLevel("WARNING")
diff --git a/app/router/v2/cache.py b/app/router/v2/cache.py
index fe610a6..0b1a396 100644
--- a/app/router/v2/cache.py
+++ b/app/router/v2/cache.py
@@ -133,9 +133,7 @@ async def warmup_cache(
return {"message": f"Warmed up cache for {len(request.user_ids)} users"}
else:
# 预热活跃用户
- from app.scheduler.user_cache_scheduler import (
- schedule_user_cache_preload_task,
- )
+ from app.tasks.cache import schedule_user_cache_preload_task
await schedule_user_cache_preload_task()
return {"message": f"Warmed up cache for top {request.limit} active users"}
diff --git a/app/scheduler/__init__.py b/app/scheduler/__init__.py
deleted file mode 100644
index d6e4f7c..0000000
--- a/app/scheduler/__init__.py
+++ /dev/null
@@ -1,7 +0,0 @@
-"""缓存调度器模块"""
-
-from __future__ import annotations
-
-from .cache_scheduler import start_cache_scheduler, stop_cache_scheduler
-
-__all__ = ["start_cache_scheduler", "stop_cache_scheduler"]
diff --git a/app/scheduler/cache_scheduler.py b/app/scheduler/cache_scheduler.py
deleted file mode 100644
index 9a36ddb..0000000
--- a/app/scheduler/cache_scheduler.py
+++ /dev/null
@@ -1,190 +0,0 @@
-from __future__ import annotations
-
-import asyncio
-
-from app.config import settings
-from app.dependencies.database import get_redis
-from app.dependencies.fetcher import get_fetcher
-from app.log import logger
-from app.scheduler.user_cache_scheduler import (
- schedule_user_cache_cleanup_task,
- schedule_user_cache_preload_task,
- schedule_user_cache_warmup_task,
-)
-
-
-class CacheScheduler:
- """缓存调度器 - 统一管理各种缓存任务"""
-
- def __init__(self):
- self.running = False
- self.task = None
-
- async def start(self):
- """启动调度器"""
- if self.running:
- return
-
- self.running = True
- self.task = asyncio.create_task(self._run_scheduler())
- logger.info("CacheScheduler started")
-
- async def stop(self):
- """停止调度器"""
- self.running = False
- if self.task:
- self.task.cancel()
- try:
- await self.task
- except asyncio.CancelledError:
- pass
- logger.info("CacheScheduler stopped")
-
- async def _run_scheduler(self):
- """运行调度器主循环"""
- # 启动时立即执行一次预热
- await self._warmup_cache()
-
- # 启动时执行一次排行榜缓存刷新
- await self._refresh_ranking_cache()
-
- # 启动时执行一次用户缓存预热
- await self._warmup_user_cache()
-
- beatmap_cache_counter = 0
- ranking_cache_counter = 0
- user_cache_counter = 0
- user_cleanup_counter = 0
-
- # 从配置文件获取间隔设置
- check_interval = 5 * 60 # 5分钟检查间隔
- beatmap_cache_interval = 30 * 60 # 30分钟beatmap缓存间隔
- ranking_cache_interval = settings.ranking_cache_refresh_interval_minutes * 60 # 从配置读取
- user_cache_interval = 15 * 60 # 15分钟用户缓存预加载间隔
- user_cleanup_interval = 60 * 60 # 60分钟用户缓存清理间隔
-
- beatmap_cache_cycles = beatmap_cache_interval // check_interval
- ranking_cache_cycles = ranking_cache_interval // check_interval
- user_cache_cycles = user_cache_interval // check_interval
- user_cleanup_cycles = user_cleanup_interval // check_interval
-
- while self.running:
- try:
- # 每5分钟检查一次
- await asyncio.sleep(check_interval)
-
- if not self.running:
- break
-
- beatmap_cache_counter += 1
- ranking_cache_counter += 1
- user_cache_counter += 1
- user_cleanup_counter += 1
-
- # beatmap缓存预热
- if beatmap_cache_counter >= beatmap_cache_cycles:
- await self._warmup_cache()
- beatmap_cache_counter = 0
-
- # 排行榜缓存刷新
- if ranking_cache_counter >= ranking_cache_cycles:
- await self._refresh_ranking_cache()
- ranking_cache_counter = 0
-
- # 用户缓存预加载
- if user_cache_counter >= user_cache_cycles:
- await self._preload_user_cache()
- user_cache_counter = 0
-
- # 用户缓存清理
- if user_cleanup_counter >= user_cleanup_cycles:
- await self._cleanup_user_cache()
- user_cleanup_counter = 0
-
- except asyncio.CancelledError:
- break
- except Exception as e:
- logger.error(f"Cache scheduler error: {e}")
- await asyncio.sleep(60) # 出错后等待1分钟再继续
-
- async def _warmup_cache(self):
- """执行缓存预热"""
- try:
- logger.info("Starting beatmap cache warmup...")
-
- fetcher = await get_fetcher()
- redis = get_redis()
-
- # 预热主页缓存
- await fetcher.warmup_homepage_cache(redis)
-
- logger.info("Beatmap cache warmup completed successfully")
-
- except Exception as e:
- logger.error(f"Beatmap cache warmup failed: {e}")
-
- async def _refresh_ranking_cache(self):
- """刷新排行榜缓存"""
- try:
- logger.info("Starting ranking cache refresh...")
-
- redis = get_redis()
-
- # 导入排行榜缓存服务
- # 使用独立的数据库会话
- from app.dependencies.database import with_db
- from app.service.ranking_cache_service import (
- schedule_ranking_refresh_task,
- )
-
- async with with_db() as session:
- await schedule_ranking_refresh_task(session, redis)
-
- logger.info("Ranking cache refresh completed successfully")
-
- except Exception as e:
- logger.error(f"Ranking cache refresh failed: {e}")
-
- async def _warmup_user_cache(self):
- """用户缓存预热"""
- try:
- await schedule_user_cache_warmup_task()
- except Exception as e:
- logger.error(f"User cache warmup failed: {e}")
-
- async def _preload_user_cache(self):
- """用户缓存预加载"""
- try:
- await schedule_user_cache_preload_task()
- except Exception as e:
- logger.error(f"User cache preload failed: {e}")
-
- async def _cleanup_user_cache(self):
- """用户缓存清理"""
- try:
- await schedule_user_cache_cleanup_task()
- except Exception as e:
- logger.error(f"User cache cleanup failed: {e}")
-
-
-# Beatmap缓存调度器(保持向后兼容)
-class BeatmapsetCacheScheduler(CacheScheduler):
- """谱面集缓存调度器 - 为了向后兼容"""
-
- pass
-
-
-# 全局调度器实例
-cache_scheduler = CacheScheduler()
-# 保持向后兼容的别名
-beatmapset_cache_scheduler = BeatmapsetCacheScheduler()
-
-
-async def start_cache_scheduler():
- """启动缓存调度器"""
- await cache_scheduler.start()
-
-
-async def stop_cache_scheduler():
- """停止缓存调度器"""
- await cache_scheduler.stop()
diff --git a/app/scheduler/user_cache_scheduler.py b/app/scheduler/user_cache_scheduler.py
deleted file mode 100644
index 0589daf..0000000
--- a/app/scheduler/user_cache_scheduler.py
+++ /dev/null
@@ -1,124 +0,0 @@
-"""
-用户缓存预热任务调度器
-"""
-
-from __future__ import annotations
-
-import asyncio
-from datetime import timedelta
-
-from app.config import settings
-from app.database.score import Score
-from app.dependencies.database import get_redis
-from app.log import logger
-from app.service.user_cache_service import get_user_cache_service
-from app.utils import utcnow
-
-from sqlmodel import col, func, select
-
-
-async def schedule_user_cache_preload_task():
- """定时用户缓存预加载任务"""
- # 默认启用用户缓存预加载,除非明确禁用
- enable_user_cache_preload = getattr(settings, "enable_user_cache_preload", True)
- if not enable_user_cache_preload:
- return
-
- try:
- logger.info("Starting user cache preload task...")
-
- redis = get_redis()
- cache_service = get_user_cache_service(redis)
-
- # 使用独立的数据库会话
- from app.dependencies.database import with_db
-
- async with with_db() as session:
- # 获取最近24小时内活跃的用户(提交过成绩的用户)
- recent_time = utcnow() - timedelta(hours=24)
-
- score_count = func.count().label("score_count")
- active_user_ids = (
- await session.exec(
- select(Score.user_id, score_count)
- .where(col(Score.ended_at) >= recent_time)
- .group_by(col(Score.user_id))
- .order_by(score_count.desc()) # 使用标签对象而不是字符串
- .limit(settings.user_cache_max_preload_users) # 使用配置中的限制
- )
- ).all()
-
- if active_user_ids:
- user_ids = [row[0] for row in active_user_ids]
- await cache_service.preload_user_cache(session, user_ids)
- logger.info(f"Preloaded cache for {len(user_ids)} active users")
- else:
- logger.info("No active users found for cache preload")
-
- logger.info("User cache preload task completed successfully")
-
- except Exception as e:
- logger.error(f"User cache preload task failed: {e}")
-
-
-async def schedule_user_cache_warmup_task():
- """定时用户缓存预热任务 - 预加载排行榜前100用户"""
- try:
- logger.info("Starting user cache warmup task...")
-
- redis = get_redis()
- cache_service = get_user_cache_service(redis)
-
- # 使用独立的数据库会话
- from app.dependencies.database import with_db
-
- async with with_db() as session:
- # 获取全球排行榜前100的用户
- from app.database.statistics import UserStatistics
- from app.models.score import GameMode
-
- for mode in GameMode:
- try:
- top_users = (
- await session.exec(
- select(UserStatistics.user_id)
- .where(UserStatistics.mode == mode)
- .order_by(col(UserStatistics.pp).desc())
- .limit(100)
- )
- ).all()
-
- if top_users:
- user_ids = list(top_users)
- await cache_service.preload_user_cache(session, user_ids)
- logger.info(f"Warmed cache for top 100 users in {mode}")
-
- # 避免过载,稍微延迟
- await asyncio.sleep(1)
-
- except Exception as e:
- logger.error(f"Failed to warm cache for {mode}: {e}")
- continue
-
- logger.info("User cache warmup task completed successfully")
-
- except Exception as e:
- logger.error(f"User cache warmup task failed: {e}")
-
-
-async def schedule_user_cache_cleanup_task():
- """定时用户缓存清理任务"""
- try:
- logger.info("Starting user cache cleanup task...")
-
- redis = get_redis()
-
- # 清理过期的用户缓存(Redis会自动处理TTL,这里主要记录统计信息)
- cache_service = get_user_cache_service(redis)
- stats = await cache_service.get_cache_stats()
-
- logger.info(f"User cache stats: {stats}")
- logger.info("User cache cleanup task completed successfully")
-
- except Exception as e:
- logger.error(f"User cache cleanup task failed: {e}")
diff --git a/app/service/__init__.py b/app/service/__init__.py
index ced3b75..8ddddd9 100644
--- a/app/service/__init__.py
+++ b/app/service/__init__.py
@@ -1,14 +1,8 @@
from __future__ import annotations
-from .daily_challenge import create_daily_challenge_room
-from .recalculate_banned_beatmap import recalculate_banned_beatmap
-from .recalculate_failed_score import recalculate_failed_score
from .room import create_playlist_room, create_playlist_room_from_api
__all__ = [
- "create_daily_challenge_room",
"create_playlist_room",
"create_playlist_room_from_api",
- "recalculate_banned_beatmap",
- "recalculate_failed_score",
]
diff --git a/app/service/beatmapset_update_service.py b/app/service/beatmapset_update_service.py
index 16199d5..a03f91b 100644
--- a/app/service/beatmapset_update_service.py
+++ b/app/service/beatmapset_update_service.py
@@ -1,6 +1,6 @@
from __future__ import annotations
-from datetime import datetime, timedelta
+from datetime import timedelta
from enum import Enum
import math
import random
@@ -12,7 +12,6 @@ from app.database.beatmap_sync import BeatmapSync, SavedBeatmapMeta
from app.database.beatmapset import Beatmapset, BeatmapsetResp
from app.database.score import Score
from app.dependencies.database import with_db
-from app.dependencies.scheduler import get_scheduler
from app.dependencies.storage import get_storage_service
from app.log import logger
from app.models.beatmap import BeatmapRankStatus
@@ -347,15 +346,3 @@ def init_beatmapset_update_service(fetcher: "Fetcher") -> BeatmapsetUpdateServic
def get_beatmapset_update_service() -> BeatmapsetUpdateService:
assert service is not None, "BeatmapsetUpdateService is not initialized"
return service
-
-
-@get_scheduler().scheduled_job(
- "interval",
- id="update_beatmaps",
- minutes=SCHEDULER_INTERVAL_MINUTES,
- next_run_time=datetime.now() + timedelta(minutes=1),
-)
-async def beatmapset_update_job():
- if service is not None:
- bg_tasks.add_task(service.add_missing_beatmapsets)
- await service._update_beatmaps()
diff --git a/app/service/database_cleanup_service.py b/app/service/database_cleanup_service.py
index 3acac4a..c8ffd0f 100644
--- a/app/service/database_cleanup_service.py
+++ b/app/service/database_cleanup_service.py
@@ -8,8 +8,6 @@ from datetime import timedelta
from app.database.auth import OAuthToken
from app.database.verification import EmailVerification, LoginSession, TrustedDevice
-from app.dependencies.database import with_db
-from app.dependencies.scheduler import get_scheduler
from app.log import logger
from app.utils import utcnow
@@ -434,18 +432,3 @@ class DatabaseCleanupService:
"outdated_trusted_devices": 0,
"total_cleanable": 0,
}
-
-
-@get_scheduler().scheduled_job(
- "interval",
- id="cleanup_database",
- hours=1,
-)
-async def scheduled_cleanup_job():
- async with with_db() as session:
- logger.debug("Starting database cleanup...")
- results = await DatabaseCleanupService.run_full_cleanup(session)
- total = sum(results.values())
- if total > 0:
- logger.debug(f"Cleanup completed, total records cleaned: {total}")
- return results
diff --git a/app/service/email_queue.py b/app/service/email_queue.py
index 9c3ec62..7a07291 100644
--- a/app/service/email_queue.py
+++ b/app/service/email_queue.py
@@ -17,7 +17,7 @@ import uuid
from app.config import settings
from app.log import logger
-from app.utils import bg_tasks # 添加同步Redis导入
+from app.utils import bg_tasks
import redis as sync_redis
diff --git a/app/service/geoip_scheduler.py b/app/service/geoip_scheduler.py
deleted file mode 100644
index 1169496..0000000
--- a/app/service/geoip_scheduler.py
+++ /dev/null
@@ -1,55 +0,0 @@
-"""
-[GeoIP] Scheduled Update Service
-Periodically update the MaxMind GeoIP database
-"""
-
-from __future__ import annotations
-
-import asyncio
-
-from app.config import settings
-from app.dependencies.geoip import get_geoip_helper
-from app.dependencies.scheduler import get_scheduler
-from app.log import logger
-
-
-async def update_geoip_database():
- """
- Asynchronous task to update the GeoIP database
- """
- try:
- logger.info("[GeoIP] Starting scheduled GeoIP database update...")
- geoip = get_geoip_helper()
-
- # Run the synchronous update method in a background thread
- loop = asyncio.get_event_loop()
- await loop.run_in_executor(None, lambda: geoip.update(force=False))
-
- logger.info("[GeoIP] Scheduled GeoIP database update completed successfully")
- except Exception as e:
- logger.error(f"[GeoIP] Scheduled GeoIP database update failed: {e}")
-
-
-def schedule_geoip_updates():
- """
- Schedule the GeoIP database update task
- """
- scheduler = get_scheduler()
-
- # Use settings to configure the update time: update once a week
- scheduler.add_job(
- update_geoip_database,
- "cron",
- day_of_week=settings.geoip_update_day,
- hour=settings.geoip_update_hour,
- minute=0,
- id="geoip_weekly_update",
- name="Weekly GeoIP database update",
- replace_existing=True,
- )
-
- logger.info(
- f"[GeoIP] Scheduled update task registered: "
- f"every week on day {settings.geoip_update_day} "
- f"at {settings.geoip_update_hour}:00"
- )
diff --git a/app/service/init_geoip.py b/app/service/init_geoip.py
deleted file mode 100644
index 95a2edf..0000000
--- a/app/service/init_geoip.py
+++ /dev/null
@@ -1,30 +0,0 @@
-"""
-[GeoIP] Initialization Service
-Initialize the GeoIP database when the application starts
-"""
-
-from __future__ import annotations
-
-import asyncio
-
-from app.dependencies.geoip import get_geoip_helper
-from app.log import logger
-
-
-async def init_geoip():
- """
- Asynchronously initialize the GeoIP database
- """
- try:
- geoip = get_geoip_helper()
- logger.info("[GeoIP] Initializing GeoIP database...")
-
- # Run the synchronous update method in a background thread
- # force=False means only download if files don't exist or are expired
- loop = asyncio.get_event_loop()
- await loop.run_in_executor(None, lambda: geoip.update(force=False))
-
- logger.info("[GeoIP] GeoIP database initialization completed")
- except Exception as e:
- logger.error(f"[GeoIP] GeoIP database initialization failed: {e}")
- # Do not raise an exception to avoid blocking application startup
diff --git a/app/tasks/__init__.py b/app/tasks/__init__.py
new file mode 100644
index 0000000..6b3332a
--- /dev/null
+++ b/app/tasks/__init__.py
@@ -0,0 +1,28 @@
+# ruff: noqa: F401
+from __future__ import annotations
+
+from . import (
+ beatmapset_update,
+ database_cleanup,
+ recalculate_banned_beatmap,
+ recalculate_failed_score,
+)
+from .cache import start_cache_tasks, stop_cache_tasks
+from .calculate_all_user_rank import calculate_user_rank
+from .create_banchobot import create_banchobot
+from .daily_challenge import daily_challenge_job, process_daily_challenge_top
+from .geoip import init_geoip
+from .load_achievements import load_achievements
+from .osu_rx_statistics import create_rx_statistics
+
+__all__ = [
+ "calculate_user_rank",
+ "create_banchobot",
+ "create_rx_statistics",
+ "daily_challenge_job",
+ "init_geoip",
+ "load_achievements",
+ "process_daily_challenge_top",
+ "start_cache_tasks",
+ "stop_cache_tasks",
+]
diff --git a/app/tasks/beatmapset_update.py b/app/tasks/beatmapset_update.py
new file mode 100644
index 0000000..4e1492e
--- /dev/null
+++ b/app/tasks/beatmapset_update.py
@@ -0,0 +1,21 @@
+from __future__ import annotations
+
+from datetime import datetime, timedelta
+
+from app.dependencies.scheduler import get_scheduler
+from app.service.beatmapset_update_service import service
+from app.utils import bg_tasks
+
+SCHEDULER_INTERVAL_MINUTES = 2
+
+
+@get_scheduler().scheduled_job(
+ "interval",
+ id="update_beatmaps",
+ minutes=SCHEDULER_INTERVAL_MINUTES,
+ next_run_time=datetime.now() + timedelta(minutes=1),
+)
+async def beatmapset_update_job():
+ if service is not None:
+ bg_tasks.add_task(service.add_missing_beatmapsets)
+ await service._update_beatmaps()
diff --git a/app/tasks/cache.py b/app/tasks/cache.py
new file mode 100644
index 0000000..4a684f6
--- /dev/null
+++ b/app/tasks/cache.py
@@ -0,0 +1,254 @@
+"""缓存相关的 APScheduler 任务入口。"""
+
+from __future__ import annotations
+
+import asyncio
+from datetime import UTC, timedelta
+from typing import Final
+
+from app.config import settings
+from app.database.score import Score
+from app.dependencies.database import get_redis
+from app.dependencies.fetcher import get_fetcher
+from app.dependencies.scheduler import get_scheduler
+from app.log import logger
+from app.service.ranking_cache_service import schedule_ranking_refresh_task
+from app.service.user_cache_service import get_user_cache_service
+from app.utils import utcnow
+
+from apscheduler.jobstores.base import JobLookupError
+from apscheduler.triggers.interval import IntervalTrigger
+from sqlmodel import col, func, select
+
+CACHE_JOB_IDS: Final[dict[str, str]] = {
+ "beatmap_warmup": "cache:beatmap:warmup",
+ "ranking_refresh": "cache:ranking:refresh",
+ "user_preload": "cache:user:preload",
+ "user_cleanup": "cache:user:cleanup",
+}
+
+
+async def warmup_cache() -> None:
+ """执行缓存预热"""
+ try:
+ logger.info("Starting beatmap cache warmup...")
+
+ fetcher = await get_fetcher()
+ redis = get_redis()
+
+ await fetcher.warmup_homepage_cache(redis)
+
+ logger.info("Beatmap cache warmup completed successfully")
+
+ except Exception as e:
+ logger.error("Beatmap cache warmup failed: %s", e)
+
+
+async def refresh_ranking_cache() -> None:
+ """刷新排行榜缓存"""
+ try:
+ logger.info("Starting ranking cache refresh...")
+
+ redis = get_redis()
+
+ from app.dependencies.database import with_db
+
+ async with with_db() as session:
+ await schedule_ranking_refresh_task(session, redis)
+
+ logger.info("Ranking cache refresh completed successfully")
+
+ except Exception as e:
+ logger.error("Ranking cache refresh failed: %s", e)
+
+
+async def schedule_user_cache_preload_task() -> None:
+ """定时用户缓存预加载任务"""
+ enable_user_cache_preload = getattr(settings, "enable_user_cache_preload", True)
+ if not enable_user_cache_preload:
+ return
+
+ try:
+ logger.info("Starting user cache preload task...")
+
+ redis = get_redis()
+ cache_service = get_user_cache_service(redis)
+
+ from app.dependencies.database import with_db
+
+ async with with_db() as session:
+ recent_time = utcnow() - timedelta(hours=24)
+
+ score_count = func.count().label("score_count")
+ active_user_ids = (
+ await session.exec(
+ select(Score.user_id, score_count)
+ .where(col(Score.ended_at) >= recent_time)
+ .group_by(col(Score.user_id))
+ .order_by(score_count.desc())
+ .limit(settings.user_cache_max_preload_users)
+ )
+ ).all()
+
+ if active_user_ids:
+ user_ids = [row[0] for row in active_user_ids]
+ await cache_service.preload_user_cache(session, user_ids)
+ logger.info("Preloaded cache for %s active users", len(user_ids))
+ else:
+ logger.info("No active users found for cache preload")
+
+ logger.info("User cache preload task completed successfully")
+
+ except Exception as e:
+ logger.error("User cache preload task failed: %s", e)
+
+
+async def schedule_user_cache_warmup_task() -> None:
+ """定时用户缓存预热任务 - 预加载排行榜前100用户"""
+ try:
+ logger.info("Starting user cache warmup task...")
+
+ redis = get_redis()
+ cache_service = get_user_cache_service(redis)
+
+ from app.dependencies.database import with_db
+
+ async with with_db() as session:
+ from app.database.statistics import UserStatistics
+ from app.models.score import GameMode
+
+ for mode in GameMode:
+ try:
+ top_users = (
+ await session.exec(
+ select(UserStatistics.user_id)
+ .where(UserStatistics.mode == mode)
+ .order_by(col(UserStatistics.pp).desc())
+ .limit(100)
+ )
+ ).all()
+
+ if top_users:
+ user_ids = list(top_users)
+ await cache_service.preload_user_cache(session, user_ids)
+ logger.info("Warmed cache for top 100 users in %s", mode)
+
+ await asyncio.sleep(1)
+
+ except Exception as e:
+ logger.error("Failed to warm cache for %s: %s", mode, e)
+ continue
+
+ logger.info("User cache warmup task completed successfully")
+
+ except Exception as e:
+ logger.error("User cache warmup task failed: %s", e)
+
+
+async def schedule_user_cache_cleanup_task() -> None:
+ """定时用户缓存清理任务"""
+ try:
+ logger.info("Starting user cache cleanup task...")
+
+ redis = get_redis()
+
+ cache_service = get_user_cache_service(redis)
+ stats = await cache_service.get_cache_stats()
+
+ logger.info("User cache stats: %s", stats)
+ logger.info("User cache cleanup task completed successfully")
+
+ except Exception as e:
+ logger.error("User cache cleanup task failed: %s", e)
+
+
+async def warmup_user_cache() -> None:
+ """用户缓存预热"""
+ try:
+ await schedule_user_cache_warmup_task()
+ except Exception as e:
+ logger.error("User cache warmup failed: %s", e)
+
+
+async def preload_user_cache() -> None:
+ """用户缓存预加载"""
+ try:
+ await schedule_user_cache_preload_task()
+ except Exception as e:
+ logger.error("User cache preload failed: %s", e)
+
+
+async def cleanup_user_cache() -> None:
+ """用户缓存清理"""
+ try:
+ await schedule_user_cache_cleanup_task()
+ except Exception as e:
+ logger.error("User cache cleanup failed: %s", e)
+
+
+def register_cache_jobs() -> None:
+ """注册缓存相关 APScheduler 任务"""
+ scheduler = get_scheduler()
+
+ scheduler.add_job(
+ warmup_cache,
+ trigger=IntervalTrigger(minutes=30, timezone=UTC),
+ id=CACHE_JOB_IDS["beatmap_warmup"],
+ replace_existing=True,
+ coalesce=True,
+ max_instances=1,
+ misfire_grace_time=300,
+ )
+
+ scheduler.add_job(
+ refresh_ranking_cache,
+ trigger=IntervalTrigger(
+ minutes=settings.ranking_cache_refresh_interval_minutes,
+ timezone=UTC,
+ ),
+ id=CACHE_JOB_IDS["ranking_refresh"],
+ replace_existing=True,
+ coalesce=True,
+ max_instances=1,
+ misfire_grace_time=300,
+ )
+
+ scheduler.add_job(
+ preload_user_cache,
+ trigger=IntervalTrigger(minutes=15, timezone=UTC),
+ id=CACHE_JOB_IDS["user_preload"],
+ replace_existing=True,
+ coalesce=True,
+ max_instances=1,
+ misfire_grace_time=300,
+ )
+
+ scheduler.add_job(
+ cleanup_user_cache,
+ trigger=IntervalTrigger(hours=1, timezone=UTC),
+ id=CACHE_JOB_IDS["user_cleanup"],
+ replace_existing=True,
+ coalesce=True,
+ max_instances=1,
+ misfire_grace_time=300,
+ )
+
+ logger.info("Registered cache APScheduler jobs")
+
+
+async def start_cache_tasks() -> None:
+ """注册 APScheduler 任务并执行启动时任务"""
+ register_cache_jobs()
+ logger.info("Cache APScheduler jobs registered; running initial tasks")
+
+
+async def stop_cache_tasks() -> None:
+ """移除 APScheduler 任务"""
+ scheduler = get_scheduler()
+ for job_id in CACHE_JOB_IDS.values():
+ try:
+ scheduler.remove_job(job_id)
+ except JobLookupError:
+ continue
+
+ logger.info("Cache APScheduler jobs removed")
diff --git a/app/service/calculate_all_user_rank.py b/app/tasks/calculate_all_user_rank.py
similarity index 82%
rename from app/service/calculate_all_user_rank.py
rename to app/tasks/calculate_all_user_rank.py
index 1d1395a..f742d25 100644
--- a/app/service/calculate_all_user_rank.py
+++ b/app/tasks/calculate_all_user_rank.py
@@ -6,6 +6,7 @@ from app.database import RankHistory, UserStatistics
from app.database.rank_history import RankTop
from app.dependencies.database import with_db
from app.dependencies.scheduler import get_scheduler
+from app.log import logger
from app.models.score import GameMode
from app.utils import utcnow
@@ -16,8 +17,10 @@ from sqlmodel import col, exists, select, update
async def calculate_user_rank(is_today: bool = False):
today = utcnow().date()
target_date = today if is_today else today - timedelta(days=1)
+ logger.info("Starting user rank calculation for {}", target_date)
async with with_db() as session:
for gamemode in GameMode:
+ logger.info("Calculating ranks for {} on {}", gamemode.name, target_date)
users = await session.exec(
select(UserStatistics)
.where(
@@ -31,6 +34,7 @@ async def calculate_user_rank(is_today: bool = False):
)
)
rank = 1
+ processed_users = 0
for user in users:
is_exist = (
await session.exec(
@@ -82,4 +86,15 @@ async def calculate_user_rank(is_today: bool = False):
rank_top.date = today
rank += 1
+ processed_users += 1
await session.commit()
+ if processed_users > 0:
+ logger.info(
+ "Updated ranks for {} on {} ({} users)",
+ gamemode.name,
+ target_date,
+ processed_users,
+ )
+ else:
+ logger.info("No users found for {} on {}", gamemode.name, target_date)
+ logger.success("User rank calculation completed for {}", target_date)
diff --git a/app/service/create_banchobot.py b/app/tasks/create_banchobot.py
similarity index 92%
rename from app/service/create_banchobot.py
rename to app/tasks/create_banchobot.py
index 16605c5..6148ff1 100644
--- a/app/service/create_banchobot.py
+++ b/app/tasks/create_banchobot.py
@@ -4,6 +4,7 @@ from app.const import BANCHOBOT_ID
from app.database.statistics import UserStatistics
from app.database.user import User
from app.dependencies.database import with_db
+from app.log import logger
from app.models.score import GameMode
from sqlmodel import exists, select
@@ -27,3 +28,4 @@ async def create_banchobot():
statistics = UserStatistics(user_id=BANCHOBOT_ID, mode=GameMode.OSU)
session.add(statistics)
await session.commit()
+ logger.success("BanchoBot user created")
diff --git a/app/service/daily_challenge.py b/app/tasks/daily_challenge.py
similarity index 99%
rename from app/service/daily_challenge.py
rename to app/tasks/daily_challenge.py
index c0f5aac..e8a7fa3 100644
--- a/app/service/daily_challenge.py
+++ b/app/tasks/daily_challenge.py
@@ -17,10 +17,9 @@ from app.log import logger
from app.models.metadata_hub import DailyChallengeInfo
from app.models.mods import APIMod, get_available_mods
from app.models.room import RoomCategory
+from app.service.room import create_playlist_room
from app.utils import are_same_weeks, utcnow
-from .room import create_playlist_room
-
from sqlmodel import col, select
diff --git a/app/tasks/database_cleanup.py b/app/tasks/database_cleanup.py
new file mode 100644
index 0000000..264e5c1
--- /dev/null
+++ b/app/tasks/database_cleanup.py
@@ -0,0 +1,21 @@
+from __future__ import annotations
+
+from app.dependencies.database import with_db
+from app.dependencies.scheduler import get_scheduler
+from app.log import logger
+from app.service.database_cleanup_service import DatabaseCleanupService
+
+
+@get_scheduler().scheduled_job(
+ "interval",
+ id="cleanup_database",
+ hours=1,
+)
+async def scheduled_cleanup_job():
+ async with with_db() as session:
+ logger.info("Starting database cleanup...")
+ results = await DatabaseCleanupService.run_full_cleanup(session)
+ total = sum(results.values())
+ if total > 0:
+ logger.success(f"Cleanup completed, total records cleaned: {total}")
+ return results
diff --git a/app/tasks/geoip.py b/app/tasks/geoip.py
new file mode 100644
index 0000000..0d22ed8
--- /dev/null
+++ b/app/tasks/geoip.py
@@ -0,0 +1,57 @@
+"""
+Scheduled Update Service
+Periodically update the MaxMind GeoIP database
+"""
+
+from __future__ import annotations
+
+import asyncio
+
+from app.config import settings
+from app.dependencies.geoip import get_geoip_helper
+from app.dependencies.scheduler import get_scheduler
+from app.log import logger
+
+
+@get_scheduler().scheduled_job(
+ "cron",
+ day_of_week=settings.geoip_update_day,
+ hour=settings.geoip_update_hour,
+ minute=0,
+ id="geoip_weekly_update",
+ name="Weekly GeoIP database update",
+)
+async def update_geoip_database():
+ """
+ Asynchronous task to update the GeoIP database
+ """
+ try:
+ logger.info("Starting scheduled GeoIP database update...")
+ geoip = get_geoip_helper()
+
+ # Run the synchronous update method in a background thread
+ loop = asyncio.get_event_loop()
+ await loop.run_in_executor(None, lambda: geoip.update(force=False))
+
+ logger.info("Scheduled GeoIP database update completed successfully")
+ except Exception as e:
+ logger.error(f"Scheduled GeoIP database update failed: {e}")
+
+
+async def init_geoip():
+ """
+ Asynchronously initialize the GeoIP database
+ """
+ try:
+ geoip = get_geoip_helper()
+ logger.info("Initializing GeoIP database...")
+
+ # Run the synchronous update method in a background thread
+ # force=False means only download if files don't exist or are expired
+ loop = asyncio.get_event_loop()
+ await loop.run_in_executor(None, lambda: geoip.update(force=False))
+
+ logger.info("GeoIP database initialization completed")
+ except Exception as e:
+ logger.error(f"GeoIP database initialization failed: {e}")
+ # Do not raise an exception to avoid blocking application startup
diff --git a/app/service/load_achievements.py b/app/tasks/load_achievements.py
similarity index 100%
rename from app/service/load_achievements.py
rename to app/tasks/load_achievements.py
diff --git a/app/service/osu_rx_statistics.py b/app/tasks/osu_rx_statistics.py
similarity index 78%
rename from app/service/osu_rx_statistics.py
rename to app/tasks/osu_rx_statistics.py
index ed82189..732d727 100644
--- a/app/service/osu_rx_statistics.py
+++ b/app/tasks/osu_rx_statistics.py
@@ -5,6 +5,7 @@ from app.const import BANCHOBOT_ID
from app.database.statistics import UserStatistics
from app.database.user import User
from app.dependencies.database import with_db
+from app.log import logger
from app.models.score import GameMode
from sqlalchemy import exists
@@ -14,6 +15,10 @@ from sqlmodel import select
async def create_rx_statistics():
async with with_db() as session:
users = (await session.exec(select(User.id))).all()
+ total_users = len(users)
+ logger.info("Ensuring RX/AP statistics exist for %s users", total_users)
+ rx_created = 0
+ ap_created = 0
for i in users:
if i == BANCHOBOT_ID:
continue
@@ -35,6 +40,7 @@ async def create_rx_statistics():
if not is_exist:
statistics_rx = UserStatistics(mode=mode, user_id=i)
session.add(statistics_rx)
+ rx_created += 1
if settings.enable_ap:
is_exist = (
await session.exec(
@@ -47,4 +53,11 @@ async def create_rx_statistics():
if not is_exist:
statistics_ap = UserStatistics(mode=GameMode.OSUAP, user_id=i)
session.add(statistics_ap)
+ ap_created += 1
await session.commit()
+ if rx_created or ap_created:
+ logger.success(
+ "Created %s RX statistics rows and %s AP statistics rows during backfill",
+ rx_created,
+ ap_created,
+ )
diff --git a/app/service/recalculate_banned_beatmap.py b/app/tasks/recalculate_banned_beatmap.py
similarity index 100%
rename from app/service/recalculate_banned_beatmap.py
rename to app/tasks/recalculate_banned_beatmap.py
diff --git a/app/service/recalculate_failed_score.py b/app/tasks/recalculate_failed_score.py
similarity index 100%
rename from app/service/recalculate_failed_score.py
rename to app/tasks/recalculate_failed_score.py
diff --git a/main.py b/main.py
index d5c7f29..d74acfa 100644
--- a/main.py
+++ b/main.py
@@ -24,18 +24,21 @@ from app.router import (
)
from app.router.redirect import redirect_router
from app.router.v1 import api_v1_public_router
-from app.scheduler.cache_scheduler import start_cache_scheduler, stop_cache_scheduler
from app.service.beatmap_download_service import download_service
from app.service.beatmapset_update_service import init_beatmapset_update_service
-from app.service.calculate_all_user_rank import calculate_user_rank
-from app.service.create_banchobot import create_banchobot
-from app.service.daily_challenge import daily_challenge_job, process_daily_challenge_top
from app.service.email_queue import start_email_processor, stop_email_processor
-from app.service.geoip_scheduler import schedule_geoip_updates
-from app.service.init_geoip import init_geoip
-from app.service.load_achievements import load_achievements
-from app.service.osu_rx_statistics import create_rx_statistics
from app.service.redis_message_system import redis_message_system
+from app.tasks import (
+ calculate_user_rank,
+ create_banchobot,
+ create_rx_statistics,
+ daily_challenge_job,
+ init_geoip,
+ load_achievements,
+ process_daily_challenge_top,
+ start_cache_tasks,
+ stop_cache_tasks,
+)
from app.utils import bg_tasks, utcnow
from fastapi import FastAPI, HTTPException, Request
@@ -56,17 +59,16 @@ async def lifespan(app: FastAPI):
await init_geoip() # 初始化 GeoIP 数据库
await create_rx_statistics()
await calculate_user_rank(True)
- start_scheduler()
- schedule_geoip_updates() # 调度 GeoIP 定时更新任务
await daily_challenge_job()
await process_daily_challenge_top()
await create_banchobot()
await start_email_processor() # 启动邮件队列处理器
await download_service.start_health_check() # 启动下载服务健康检查
- await start_cache_scheduler() # 启动缓存调度器
+ await start_cache_tasks() # 启动缓存调度器
init_beatmapset_update_service(fetcher) # 初始化谱面集更新服务
redis_message_system.start() # 启动 Redis 消息系统
load_achievements()
+ start_scheduler()
# 显示资源代理状态
if settings.enable_asset_proxy:
@@ -75,9 +77,9 @@ async def lifespan(app: FastAPI):
# on shutdown
yield
bg_tasks.stop()
- stop_scheduler()
redis_message_system.stop() # 停止 Redis 消息系统
- await stop_cache_scheduler() # 停止缓存调度器
+ await stop_cache_tasks() # 停止缓存调度器
+ stop_scheduler()
await download_service.stop_health_check() # 停止下载服务健康检查
await stop_email_processor() # 停止邮件队列处理器
await engine.dispose()