Files
g0v0-server/app/tasks/cache.py

255 lines
7.7 KiB
Python

"""缓存相关的 APScheduler 任务入口。"""
from __future__ import annotations
import asyncio
from datetime import UTC, timedelta
from typing import Final
from app.config import settings
from app.database.score import Score
from app.dependencies.database import get_redis
from app.dependencies.fetcher import get_fetcher
from app.dependencies.scheduler import get_scheduler
from app.log import logger
from app.service.ranking_cache_service import schedule_ranking_refresh_task
from app.service.user_cache_service import get_user_cache_service
from app.utils import utcnow
from apscheduler.jobstores.base import JobLookupError
from apscheduler.triggers.interval import IntervalTrigger
from sqlmodel import col, func, select
CACHE_JOB_IDS: Final[dict[str, str]] = {
"beatmap_warmup": "cache:beatmap:warmup",
"ranking_refresh": "cache:ranking:refresh",
"user_preload": "cache:user:preload",
"user_cleanup": "cache:user:cleanup",
}
async def warmup_cache() -> None:
"""执行缓存预热"""
try:
logger.info("Starting beatmap cache warmup...")
fetcher = await get_fetcher()
redis = get_redis()
await fetcher.warmup_homepage_cache(redis)
logger.info("Beatmap cache warmup completed successfully")
except Exception as e:
logger.error("Beatmap cache warmup failed: %s", e)
async def refresh_ranking_cache() -> None:
"""刷新排行榜缓存"""
try:
logger.info("Starting ranking cache refresh...")
redis = get_redis()
from app.dependencies.database import with_db
async with with_db() as session:
await schedule_ranking_refresh_task(session, redis)
logger.info("Ranking cache refresh completed successfully")
except Exception as e:
logger.error("Ranking cache refresh failed: %s", e)
async def schedule_user_cache_preload_task() -> None:
"""定时用户缓存预加载任务"""
enable_user_cache_preload = getattr(settings, "enable_user_cache_preload", True)
if not enable_user_cache_preload:
return
try:
logger.info("Starting user cache preload task...")
redis = get_redis()
cache_service = get_user_cache_service(redis)
from app.dependencies.database import with_db
async with with_db() as session:
recent_time = utcnow() - timedelta(hours=24)
score_count = func.count().label("score_count")
active_user_ids = (
await session.exec(
select(Score.user_id, score_count)
.where(col(Score.ended_at) >= recent_time)
.group_by(col(Score.user_id))
.order_by(score_count.desc())
.limit(settings.user_cache_max_preload_users)
)
).all()
if active_user_ids:
user_ids = [row[0] for row in active_user_ids]
await cache_service.preload_user_cache(session, user_ids)
logger.info("Preloaded cache for %s active users", len(user_ids))
else:
logger.info("No active users found for cache preload")
logger.info("User cache preload task completed successfully")
except Exception as e:
logger.error("User cache preload task failed: %s", e)
async def schedule_user_cache_warmup_task() -> None:
"""定时用户缓存预热任务 - 预加载排行榜前100用户"""
try:
logger.info("Starting user cache warmup task...")
redis = get_redis()
cache_service = get_user_cache_service(redis)
from app.dependencies.database import with_db
async with with_db() as session:
from app.database.statistics import UserStatistics
from app.models.score import GameMode
for mode in GameMode:
try:
top_users = (
await session.exec(
select(UserStatistics.user_id)
.where(UserStatistics.mode == mode)
.order_by(col(UserStatistics.pp).desc())
.limit(100)
)
).all()
if top_users:
user_ids = list(top_users)
await cache_service.preload_user_cache(session, user_ids)
logger.info("Warmed cache for top 100 users in %s", mode)
await asyncio.sleep(1)
except Exception as e:
logger.error("Failed to warm cache for %s: %s", mode, e)
continue
logger.info("User cache warmup task completed successfully")
except Exception as e:
logger.error("User cache warmup task failed: %s", e)
async def schedule_user_cache_cleanup_task() -> None:
"""定时用户缓存清理任务"""
try:
logger.info("Starting user cache cleanup task...")
redis = get_redis()
cache_service = get_user_cache_service(redis)
stats = await cache_service.get_cache_stats()
logger.info("User cache stats: %s", stats)
logger.info("User cache cleanup task completed successfully")
except Exception as e:
logger.error("User cache cleanup task failed: %s", e)
async def warmup_user_cache() -> None:
"""用户缓存预热"""
try:
await schedule_user_cache_warmup_task()
except Exception as e:
logger.error("User cache warmup failed: %s", e)
async def preload_user_cache() -> None:
"""用户缓存预加载"""
try:
await schedule_user_cache_preload_task()
except Exception as e:
logger.error("User cache preload failed: %s", e)
async def cleanup_user_cache() -> None:
"""用户缓存清理"""
try:
await schedule_user_cache_cleanup_task()
except Exception as e:
logger.error("User cache cleanup failed: %s", e)
def register_cache_jobs() -> None:
"""注册缓存相关 APScheduler 任务"""
scheduler = get_scheduler()
scheduler.add_job(
warmup_cache,
trigger=IntervalTrigger(minutes=30, timezone=UTC),
id=CACHE_JOB_IDS["beatmap_warmup"],
replace_existing=True,
coalesce=True,
max_instances=1,
misfire_grace_time=300,
)
scheduler.add_job(
refresh_ranking_cache,
trigger=IntervalTrigger(
minutes=settings.ranking_cache_refresh_interval_minutes,
timezone=UTC,
),
id=CACHE_JOB_IDS["ranking_refresh"],
replace_existing=True,
coalesce=True,
max_instances=1,
misfire_grace_time=300,
)
scheduler.add_job(
preload_user_cache,
trigger=IntervalTrigger(minutes=15, timezone=UTC),
id=CACHE_JOB_IDS["user_preload"],
replace_existing=True,
coalesce=True,
max_instances=1,
misfire_grace_time=300,
)
scheduler.add_job(
cleanup_user_cache,
trigger=IntervalTrigger(hours=1, timezone=UTC),
id=CACHE_JOB_IDS["user_cleanup"],
replace_existing=True,
coalesce=True,
max_instances=1,
misfire_grace_time=300,
)
logger.info("Registered cache APScheduler jobs")
async def start_cache_tasks() -> None:
"""注册 APScheduler 任务并执行启动时任务"""
register_cache_jobs()
logger.info("Cache APScheduler jobs registered; running initial tasks")
async def stop_cache_tasks() -> None:
"""移除 APScheduler 任务"""
scheduler = get_scheduler()
for job_id in CACHE_JOB_IDS.values():
try:
scheduler.remove_job(job_id)
except JobLookupError:
continue
logger.info("Cache APScheduler jobs removed")