diff --git a/.env.example b/.env.example index 72c3a0e..f1a89a1 100644 --- a/.env.example +++ b/.env.example @@ -68,12 +68,12 @@ GEOIP_UPDATE_HOUR=2 # Game Settings ENABLE_RX=false ENABLE_AP=false -ENABLE_ALL_MODS_PP=false ENABLE_SUPPORTER_FOR_ALL_USERS=false ENABLE_ALL_BEATMAP_LEADERBOARD=false ENABLE_ALL_BEATMAP_PP=false SEASONAL_BACKGROUNDS='[]' BEATMAP_TAG_TOP_COUNT=2 +OLD_SCORE_PROCESSING_MODE=normal # Beatmap Cache Settings ENABLE_BEATMAP_PRELOAD=true diff --git a/app/config.py b/app/config.py index 437de19..0254364 100644 --- a/app/config.py +++ b/app/config.py @@ -38,6 +38,11 @@ class StorageServiceType(str, Enum): AWS_S3 = "s3" +class OldScoreProcessingMode(str, Enum): + STRICT = "strict" + NORMAL = "normal" + + SPECTATOR_DOC = """ ## 旁观服务器设置 | 变量名 | 描述 | 类型 | 默认值 | @@ -458,6 +463,16 @@ STORAGE_SETTINGS='{ Field(default=2, description="显示在结算列表的标签所需的最低票数"), "游戏设置", ] + old_score_processing_mode: Annotated[ + OldScoreProcessingMode, + Field( + default=OldScoreProcessingMode.NORMAL, + description=( + "旧成绩处理模式
strict: 删除所有相关的成绩、pp、统计信息、回放
normal: 删除 pp 和排行榜成绩" + ), + ), + "游戏设置", + ] # 谱面缓存设置 enable_beatmap_preload: Annotated[ diff --git a/app/database/__init__.py b/app/database/__init__.py index 60267d8..bac7cc1 100644 --- a/app/database/__init__.py +++ b/app/database/__init__.py @@ -5,6 +5,7 @@ from .beatmap import ( BeatmapResp, ) from .beatmap_playcounts import BeatmapPlaycounts, BeatmapPlaycountsResp +from .beatmap_sync import BeatmapSync from .beatmap_tags import BeatmapTagVote from .beatmapset import ( Beatmapset, @@ -76,6 +77,7 @@ __all__ = [ "BeatmapPlaycountsResp", "BeatmapRating", "BeatmapResp", + "BeatmapSync", "BeatmapTagVote", "Beatmapset", "BeatmapsetResp", diff --git a/app/database/beatmap.py b/app/database/beatmap.py index 54dc0b2..0f4e565 100644 --- a/app/database/beatmap.py +++ b/app/database/beatmap.py @@ -71,7 +71,7 @@ class Beatmap(BeatmapBase, table=True): failtimes: FailTime | None = Relationship(back_populates="beatmap", sa_relationship_kwargs={"lazy": "joined"}) @classmethod - async def from_resp(cls, session: AsyncSession, resp: "BeatmapResp") -> "Beatmap": + async def from_resp_no_save(cls, session: AsyncSession, resp: "BeatmapResp") -> "Beatmap": d = resp.model_dump() del d["beatmapset"] beatmap = Beatmap.model_validate( @@ -82,11 +82,16 @@ class Beatmap(BeatmapBase, table=True): "beatmap_status": BeatmapRankStatus(resp.ranked), } ) + return beatmap + + @classmethod + async def from_resp(cls, session: AsyncSession, resp: "BeatmapResp") -> "Beatmap": + beatmap = await cls.from_resp_no_save(session, resp) if not (await session.exec(select(exists()).where(Beatmap.id == resp.id))).first(): session.add(beatmap) await session.commit() - beatmap = (await session.exec(select(Beatmap).where(Beatmap.id == resp.id))).first() - assert beatmap is not None, "Beatmap should not be None after commit" + beatmap = (await session.exec(select(Beatmap).where(Beatmap.id == resp.id))).first() + assert beatmap is not None, "Beatmap should not be None after commit" return beatmap @classmethod diff --git a/app/database/beatmap_sync.py b/app/database/beatmap_sync.py new file mode 100644 index 0000000..92fc748 --- /dev/null +++ b/app/database/beatmap_sync.py @@ -0,0 +1,23 @@ +from datetime import datetime +from typing import TypedDict + +from app.models.beatmap import BeatmapRankStatus +from app.utils import utcnow + +from sqlmodel import JSON, Column, DateTime, Field, SQLModel + + +class SavedBeatmapMeta(TypedDict): + beatmap_id: int + md5: str + is_deleted: bool + beatmap_status: BeatmapRankStatus + + +class BeatmapSync(SQLModel, table=True): + beatmapset_id: int = Field(primary_key=True, foreign_key="beatmapsets.id") + beatmaps: list[SavedBeatmapMeta] = Field(sa_column=Column(JSON)) + beatmap_status: BeatmapRankStatus = Field(index=True) + consecutive_no_change: int = Field(default=0) + next_sync_time: datetime = Field(default_factory=utcnow, sa_column=Column(DateTime, index=True)) + updated_at: datetime = Field(default_factory=utcnow, sa_column=Column(DateTime, index=True)) diff --git a/app/database/beatmapset.py b/app/database/beatmapset.py index ded31ed..1ef14f5 100644 --- a/app/database/beatmapset.py +++ b/app/database/beatmapset.py @@ -130,9 +130,7 @@ class Beatmapset(AsyncAttrs, BeatmapsetBase, table=True): favourites: list["FavouriteBeatmapset"] = Relationship(back_populates="beatmapset") @classmethod - async def from_resp(cls, session: AsyncSession, resp: "BeatmapsetResp", from_: int = 0) -> "Beatmapset": - from .beatmap import Beatmap - + async def from_resp_no_save(cls, session: AsyncSession, resp: "BeatmapsetResp", from_: int = 0) -> "Beatmapset": d = resp.model_dump() update = {} if resp.nominations: @@ -158,6 +156,13 @@ class Beatmapset(AsyncAttrs, BeatmapsetBase, table=True): "download_disabled": resp.availability.download_disabled or False, } ) + return beatmapset + + @classmethod + async def from_resp(cls, session: AsyncSession, resp: "BeatmapsetResp", from_: int = 0) -> "Beatmapset": + from .beatmap import Beatmap + + beatmapset = await cls.from_resp_no_save(session, resp, from_=from_) if not (await session.exec(select(exists()).where(Beatmapset.id == resp.id))).first(): session.add(beatmapset) await session.commit() @@ -166,10 +171,13 @@ class Beatmapset(AsyncAttrs, BeatmapsetBase, table=True): @classmethod async def get_or_fetch(cls, session: AsyncSession, fetcher: "Fetcher", sid: int) -> "Beatmapset": + from app.service.beatmapset_update_service import get_beatmapset_update_service + beatmapset = await session.get(Beatmapset, sid) if not beatmapset: resp = await fetcher.get_beatmapset(sid) beatmapset = await cls.from_resp(session, resp) + await get_beatmapset_update_service().add(resp) return beatmapset diff --git a/app/database/best_score.py b/app/database/best_score.py index 8179453..eaaa5d8 100644 --- a/app/database/best_score.py +++ b/app/database/best_score.py @@ -1,5 +1,7 @@ from typing import TYPE_CHECKING +from app.calculator import calculate_score_to_level +from app.database.statistics import UserStatistics from app.models.score import GameMode, Rank from .lazer_user import User @@ -12,7 +14,11 @@ from sqlmodel import ( ForeignKey, Relationship, SQLModel, + col, + func, + select, ) +from sqlmodel.ext.asyncio.session import AsyncSession if TYPE_CHECKING: from .beatmap import Beatmap @@ -37,6 +43,43 @@ class BestScore(SQLModel, table=True): sa_relationship_kwargs={ "foreign_keys": "[BestScore.score_id]", "lazy": "joined", - } + }, + back_populates="best_score", ) beatmap: "Beatmap" = Relationship() + + async def delete(self, session: AsyncSession): + from .score import Score + + statistics = await session.exec( + select(UserStatistics).where(UserStatistics.user_id == self.user_id, UserStatistics.mode == self.gamemode) + ) + statistics = statistics.first() + if statistics: + statistics.total_score -= self.total_score + statistics.ranked_score -= self.total_score + statistics.level_current = calculate_score_to_level(statistics.total_score) + match self.rank: + case Rank.X: + statistics.grade_ss -= 1 + case Rank.XH: + statistics.grade_ssh -= 1 + case Rank.S: + statistics.grade_s -= 1 + case Rank.SH: + statistics.grade_sh -= 1 + case Rank.A: + statistics.grade_a -= 1 + + max_combo = ( + await session.exec( + select(func.max(Score.max_combo)).where( + Score.user_id == self.user_id, + col(Score.id).in_(select(BestScore.score_id)), + Score.gamemode == self.gamemode, + ) + ) + ).first() + statistics.maximum_combo = max(0, max_combo or 0) + + await session.delete(self) diff --git a/app/database/pp_best_score.py b/app/database/pp_best_score.py index 2141630..ad82114 100644 --- a/app/database/pp_best_score.py +++ b/app/database/pp_best_score.py @@ -1,5 +1,6 @@ from typing import TYPE_CHECKING +from app.database.statistics import UserStatistics from app.models.score import GameMode from .lazer_user import User @@ -12,7 +13,9 @@ from sqlmodel import ( ForeignKey, Relationship, SQLModel, + select, ) +from sqlmodel.ext.asyncio.session import AsyncSession if TYPE_CHECKING: from .beatmap import Beatmap @@ -33,5 +36,22 @@ class PPBestScore(SQLModel, table=True): ) user: User = Relationship() - score: "Score" = Relationship() + score: "Score" = Relationship( + back_populates="ranked_score", + ) beatmap: "Beatmap" = Relationship() + + async def delete(self, session: AsyncSession): + from .score import calculate_user_pp + + gamemode = self.gamemode + user_id = self.user_id + await session.delete(self) + await session.flush() + + statistics = await session.exec( + select(UserStatistics).where(UserStatistics.user_id == user_id, UserStatistics.mode == gamemode) + ) + statistics = statistics.first() + if statistics: + statistics.pp, statistics.hit_accuracy = await calculate_user_pp(session, statistics.user_id, gamemode) diff --git a/app/database/score.py b/app/database/score.py index 3d3f6d2..16f0b86 100644 --- a/app/database/score.py +++ b/app/database/score.py @@ -32,6 +32,7 @@ from app.models.score import ( ScoreStatistics, SoloScoreSubmissionInfo, ) +from app.storage import StorageService from app.utils import utcnow from .beatmap import Beatmap, BeatmapResp @@ -40,6 +41,7 @@ from .beatmapset import BeatmapsetResp from .best_score import BestScore from .counts import MonthlyPlaycounts from .lazer_user import User, UserResp +from .playlist_best_score import PlaylistBestScore from .pp_best_score import PPBestScore from .relationship import ( Relationship as DBRelationship, @@ -95,6 +97,7 @@ class ScoreBase(AsyncAttrs, SQLModel, UTCBaseModel): beatmap_id: int = Field(index=True, foreign_key="beatmaps.id") maximum_statistics: ScoreStatistics = Field(sa_column=Column(JSON), default_factory=dict) processed: bool = False # solo_score + ranked: bool = False @field_validator("maximum_statistics", mode="before") @classmethod @@ -189,16 +192,57 @@ class Score(ScoreBase, table=True): # optional beatmap: Mapped[Beatmap] = Relationship() user: Mapped[User] = Relationship(sa_relationship_kwargs={"lazy": "joined"}) + best_score: Mapped[BestScore | None] = Relationship( + back_populates="score", + sa_relationship_kwargs={ + "cascade": "all, delete-orphan", + }, + ) + ranked_score: Mapped[PPBestScore | None] = Relationship( + back_populates="score", + sa_relationship_kwargs={ + "cascade": "all, delete-orphan", + }, + ) + playlist_item_score: Mapped[PlaylistBestScore | None] = Relationship( + back_populates="score", + sa_relationship_kwargs={ + "cascade": "all, delete-orphan", + }, + ) @property def is_perfect_combo(self) -> bool: return self.max_combo == self.beatmap.max_combo + @property + def replay_filename(self) -> str: + return f"replays/{self.id}_{self.beatmap_id}_{self.user_id}_lazer_replay.osr" + async def to_resp(self, session: AsyncSession, api_version: int) -> "ScoreResp | LegacyScoreResp": if api_version >= 20220705: return await ScoreResp.from_db(session, self) return await LegacyScoreResp.from_db(session, self) + async def delete( + self, + session: AsyncSession, + storage_service: StorageService, + ): + if await self.awaitable_attrs.best_score: + assert self.best_score is not None + await self.best_score.delete(session) + await session.refresh(self) + if await self.awaitable_attrs.ranked_score: + assert self.ranked_score is not None + await self.ranked_score.delete(session) + await session.refresh(self) + if await self.awaitable_attrs.playlist_item_score: + await session.delete(self.playlist_item_score) + + await storage_service.delete_file(self.replay_filename) + await session.delete(self) + class ScoreResp(ScoreBase): id: int @@ -218,7 +262,6 @@ class ScoreResp(ScoreBase): rank_country: int | None = None position: int | None = None scores_around: "ScoreAround | None" = None - ranked: bool = False current_user_attributes: CurrentUserAttributes | None = None @field_validator( @@ -335,7 +378,6 @@ class ScoreResp(ScoreBase): s.current_user_attributes = CurrentUserAttributes( pin=PinAttributes(is_pinned=bool(score.pinned_order), score_id=score.id) ) - s.ranked = s.pp > 0 return s @@ -977,6 +1019,7 @@ async def process_score( room_id=room_id, maximum_statistics=info.maximum_statistics, processed=True, + ranked=ranked, ) successed = True if can_get_pp: diff --git a/app/fetcher/_base.py b/app/fetcher/_base.py index 2274b01..86fc646 100644 --- a/app/fetcher/_base.py +++ b/app/fetcher/_base.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio import time from urllib.parse import quote @@ -30,6 +31,7 @@ class BaseFetcher: self.token_expiry: int = 0 self.callback_url: str = callback_url self.scope = scope + self._token_lock = asyncio.Lock() @property def authorize_url(self) -> str: @@ -50,29 +52,33 @@ class BaseFetcher: """ 发送 API 请求 """ - # 检查 token 是否过期,如果过期则刷新 - if self.is_token_expired(): - await self.refresh_access_token() + await self._ensure_valid_access_token() - header = kwargs.pop("headers", {}) - header.update(self.header) + headers = kwargs.pop("headers", {}).copy() + attempt = 0 - async with AsyncClient() as client: - response = await client.request( - method, - url, - headers=header, - **kwargs, - ) + while attempt < 2: + request_headers = {**headers, **self.header} + request_kwargs = kwargs.copy() - # 处理 401 错误 - if response.status_code == 401: - logger.warning(f"Received 401 error for {url}") - await self._clear_tokens() - raise TokenAuthError(f"Authentication failed. Please re-authorize using: {self.authorize_url}") + async with AsyncClient() as client: + response = await client.request( + method, + url, + headers=request_headers, + **request_kwargs, + ) - response.raise_for_status() - return response.json() + if response.status_code != 401: + response.raise_for_status() + return response.json() + + attempt += 1 + logger.warning(f"Received 401 error for {url}, attempt {attempt}") + await self._handle_unauthorized() + + await self._clear_tokens() + raise TokenAuthError(f"Authentication failed. Please re-authorize using: {self.authorize_url}") def is_token_expired(self) -> bool: return self.token_expiry <= int(time.time()) @@ -105,40 +111,71 @@ class BaseFetcher: self.refresh_token, ) - async def refresh_access_token(self) -> None: - try: - logger.info(f"Refreshing access token for client {self.client_id}") - async with AsyncClient() as client: - response = await client.post( - "https://osu.ppy.sh/oauth/token", - data={ - "client_id": self.client_id, - "client_secret": self.client_secret, - "grant_type": "refresh_token", - "refresh_token": self.refresh_token, - }, - ) - response.raise_for_status() - token_data = response.json() - self.access_token = token_data["access_token"] - self.refresh_token = token_data.get("refresh_token", "") - self.token_expiry = int(time.time()) + token_data["expires_in"] - redis = get_redis() - await redis.set( - f"fetcher:access_token:{self.client_id}", - self.access_token, - ex=token_data["expires_in"], - ) - await redis.set( - f"fetcher:refresh_token:{self.client_id}", - self.refresh_token, - ) - logger.info(f"Successfully refreshed access token for client {self.client_id}") - except Exception as e: - logger.error(f"Failed to refresh access token for client {self.client_id}: {e}") - await self._clear_tokens() - logger.warning(f"Cleared invalid tokens. Please re-authorize: {self.authorize_url}") - raise + async def refresh_access_token(self, *, force: bool = False) -> None: + if not force and not self.is_token_expired(): + return + + async with self._token_lock: + if not force and not self.is_token_expired(): + return + + if force: + await self._clear_access_token() + + if not self.refresh_token: + logger.error(f"Missing refresh token for client {self.client_id}") + await self._clear_tokens() + raise TokenAuthError(f"Missing refresh token. Please re-authorize using: {self.authorize_url}") + + try: + logger.info(f"Refreshing access token for client {self.client_id}") + async with AsyncClient() as client: + response = await client.post( + "https://osu.ppy.sh/oauth/token", + data={ + "client_id": self.client_id, + "client_secret": self.client_secret, + "grant_type": "refresh_token", + "refresh_token": self.refresh_token, + }, + ) + response.raise_for_status() + token_data = response.json() + self.access_token = token_data["access_token"] + self.refresh_token = token_data.get("refresh_token", self.refresh_token) + self.token_expiry = int(time.time()) + token_data["expires_in"] + redis = get_redis() + await redis.set( + f"fetcher:access_token:{self.client_id}", + self.access_token, + ex=token_data["expires_in"], + ) + await redis.set( + f"fetcher:refresh_token:{self.client_id}", + self.refresh_token, + ) + logger.info(f"Successfully refreshed access token for client {self.client_id}") + except Exception as e: + logger.error(f"Failed to refresh access token for client {self.client_id}: {e}") + await self._clear_tokens() + logger.warning(f"Cleared invalid tokens. Please re-authorize: {self.authorize_url}") + raise + + async def _ensure_valid_access_token(self) -> None: + if self.is_token_expired(): + await self.refresh_access_token() + + async def _handle_unauthorized(self) -> None: + await self.refresh_access_token(force=True) + + async def _clear_access_token(self) -> None: + logger.warning(f"Clearing access token for client {self.client_id}") + + self.access_token = "" + self.token_expiry = 0 + + redis = get_redis() + await redis.delete(f"fetcher:access_token:{self.client_id}") async def _clear_tokens(self) -> None: """ diff --git a/app/router/private/__init__.py b/app/router/private/__init__.py index 297c6b4..f3a9590 100644 --- a/app/router/private/__init__.py +++ b/app/router/private/__init__.py @@ -2,7 +2,7 @@ from __future__ import annotations from app.config import settings -from . import audio_proxy, avatar, beatmapset_ratings, cover, oauth, relationship, team, username # noqa: F401 +from . import audio_proxy, avatar, beatmapset, cover, oauth, relationship, score, team, username # noqa: F401 from .router import router as private_router if settings.enable_totp_verification: diff --git a/app/router/private/beatmapset_ratings.py b/app/router/private/beatmapset.py similarity index 69% rename from app/router/private/beatmapset_ratings.py rename to app/router/private/beatmapset.py index 5c39bc9..3c78028 100644 --- a/app/router/private/beatmapset_ratings.py +++ b/app/router/private/beatmapset.py @@ -7,10 +7,12 @@ from app.database.lazer_user import User from app.database.score import Score from app.dependencies.database import Database from app.dependencies.user import get_client_user +from app.service.beatmapset_update_service import get_beatmapset_update_service from .router import router -from fastapi import Body, HTTPException, Security +from fastapi import Body, Depends, HTTPException, Security +from fastapi_limiter.depends import RateLimiter from sqlmodel import col, exists, select @@ -82,3 +84,36 @@ async def rate_beatmaps( new_rating: BeatmapRating = BeatmapRating(beatmapset_id=beatmapset_id, user_id=user_id, rating=rating) session.add(new_rating) await session.commit() + + +@router.post( + "/beatmapsets/{beatmapset_id}/sync", + name="请求同步谱面集", + status_code=202, + tags=["谱面集", "g0v0 API"], + dependencies=[Depends(RateLimiter(times=50, hours=1))], +) +async def sync_beatmapset( + beatmapset_id: int, + session: Database, + current_user: User = Security(get_client_user), +): + """请求同步谱面集 + + 请求将指定的谱面集从 Bancho 同步到服务器 + + 请求发送后会加入同步队列,等待自动同步 + + 速率限制: + - 每个用户每小时最多50次请求 + + 参数: + - beatmapset_id: 谱面集ID + + 错误情况: + - 404: 找不到指定谱面集 + """ + current_beatmapset = (await session.exec(select(exists()).where(Beatmapset.id == beatmapset_id))).first() + if not current_beatmapset: + raise HTTPException(404, "Beatmapset Not Found") + await get_beatmapset_update_service().add_missing_beatmapset(beatmapset_id) diff --git a/app/router/private/score.py b/app/router/private/score.py new file mode 100644 index 0000000..44fdc2a --- /dev/null +++ b/app/router/private/score.py @@ -0,0 +1,49 @@ +from __future__ import annotations + +from app.database.lazer_user import User +from app.database.score import Score +from app.dependencies.database import Database, get_redis +from app.dependencies.storage import get_storage_service +from app.dependencies.user import get_client_user +from app.service.user_cache_service import refresh_user_cache_background +from app.storage.base import StorageService + +from .router import router + +from fastapi import BackgroundTasks, Depends, HTTPException, Security +from redis.asyncio import Redis + + +@router.delete( + "/score/{score_id}", + name="删除指定ID的成绩", + tags=["成绩", "g0v0 API"], + status_code=204, +) +async def delete_score( + session: Database, + background_task: BackgroundTasks, + score_id: int, + redis: Redis = Depends(get_redis), + current_user: User = Security(get_client_user), + storage_service: StorageService = Depends(get_storage_service), +): + """删除成绩 + + 删除成绩,同时删除对应的统计信息、排行榜分数、pp、回放文件 + + 参数: + - score_id: 成绩ID + + 错误情况: + - 404: 找不到指定成绩 + """ + score = await session.get(Score, score_id) + if not score or score.user_id != current_user.id: + raise HTTPException(status_code=404, detail="找不到指定成绩") + + gamemode = score.gamemode + user_id = score.user_id + await score.delete(session, storage_service) + await session.commit() + background_task.add_task(refresh_user_cache_background, redis, user_id, gamemode) diff --git a/app/router/v1/replay.py b/app/router/v1/replay.py index 71ff6e4..e37057e 100644 --- a/app/router/v1/replay.py +++ b/app/router/v1/replay.py @@ -69,7 +69,7 @@ async def download_replay( except KeyError: raise HTTPException(status_code=400, detail="Invalid request") - filepath = f"replays/{score_record.id}_{score_record.beatmap_id}_{score_record.user_id}_lazer_replay.osr" + filepath = score_record.replay_filename if not await storage_service.is_exists(filepath): raise HTTPException(status_code=404, detail="Replay file not found") diff --git a/app/router/v2/score.py b/app/router/v2/score.py index 468ae94..06de17a 100644 --- a/app/router/v2/score.py +++ b/app/router/v2/score.py @@ -37,7 +37,7 @@ from app.database.score import ( process_user, ) from app.dependencies.api_version import APIVersion -from app.dependencies.database import Database, get_redis, with_db +from app.dependencies.database import Database, get_redis from app.dependencies.fetcher import get_fetcher from app.dependencies.storage import get_storage_service from app.dependencies.user import get_client_user, get_current_user @@ -50,7 +50,7 @@ from app.models.score import ( Rank, SoloScoreSubmissionInfo, ) -from app.service.user_cache_service import get_user_cache_service +from app.service.user_cache_service import refresh_user_cache_background from app.storage.base import StorageService from app.utils import utcnow @@ -222,22 +222,11 @@ async def submit_score( await db.commit() if user_id is not None: - background_task.add_task(_refresh_user_cache_background, redis, user_id, score_gamemode) + background_task.add_task(refresh_user_cache_background, redis, user_id, score_gamemode) background_task.add_task(process_user_achievement, resp.id) return resp -async def _refresh_user_cache_background(redis: Redis, user_id: int, mode: GameMode): - """后台任务:刷新用户缓存""" - try: - user_cache_service = get_user_cache_service(redis) - # 创建独立的数据库会话 - async with with_db() as session: - await user_cache_service.refresh_user_cache_on_score_submit(session, user_id, mode) - except Exception as e: - logger.error(f"Failed to refresh user cache after score submit: {e}") - - async def _preload_beatmap_for_pp_calculation(beatmap_id: int) -> None: """ 预缓存beatmap文件以加速PP计算 @@ -949,7 +938,7 @@ async def download_score_replay( if not score: raise HTTPException(status_code=404, detail="Score not found") - filepath = f"replays/{score.id}_{score.beatmap_id}_{score.user_id}_lazer_replay.osr" + filepath = score.replay_filename if not await storage_service.is_exists(filepath): raise HTTPException(status_code=404, detail="Replay file not found") diff --git a/app/service/beatmapset_update_service.py b/app/service/beatmapset_update_service.py new file mode 100644 index 0000000..5ced2fd --- /dev/null +++ b/app/service/beatmapset_update_service.py @@ -0,0 +1,371 @@ +from __future__ import annotations + +from datetime import datetime, timedelta +from enum import Enum +import math +import random +from typing import TYPE_CHECKING, NamedTuple + +from app.config import OldScoreProcessingMode, settings +from app.database.beatmap import Beatmap +from app.database.beatmap_sync import BeatmapSync, SavedBeatmapMeta +from app.database.beatmapset import Beatmapset, BeatmapsetResp +from app.database.score import Score +from app.dependencies.database import with_db +from app.dependencies.scheduler import get_scheduler +from app.dependencies.storage import get_storage_service +from app.log import logger +from app.models.beatmap import BeatmapRankStatus +from app.utils import bg_tasks, utcnow + +from httpx import HTTPError +from sqlmodel import col, select + +if TYPE_CHECKING: + from app.fetcher import Fetcher + + +class BeatmapChangeType(Enum): + MAP_UPDATED = "map_updated" + MAP_DELETED = "map_deleted" + MAP_ADDED = "map_added" + STATUS_CHANGED = "status_changed" + + +class BeatmapsetChangeType(Enum): + STATUS_CHANGED = "status_changed" + HYPE_CHANGED = "hype_changed" + NOMINATIONS_CHANGED = "nominations_changed" + RANKED_DATE_CHANGED = "ranked_date_changed" + PLAYCOUNT_CHANGED = "playcount_changed" + + +class ChangedBeatmap(NamedTuple): + beatmap_id: int + type: BeatmapChangeType + + +BASE = 1200 +TAU = 3600 +JITTER_MIN = -30 +JITTER_MAX = 30 +MIN_DELTA = 1200 +GROWTH = 2.0 +GRAVEYARD_DOUBLING_PERIOD_DAYS = 30 +GRAVEYARD_MAX_DAYS = 365 +STATUS_FACTOR: dict[BeatmapRankStatus, float] = { + BeatmapRankStatus.WIP: 0.5, + BeatmapRankStatus.PENDING: 0.5, + BeatmapRankStatus.GRAVEYARD: 1, +} +SCHEDULER_INTERVAL_MINUTES = 2 + + +class ProcessingBeatmapset: + def __init__(self, beatmapset: BeatmapsetResp, record: BeatmapSync) -> None: + self.beatmapset = beatmapset + self.status = BeatmapRankStatus(self.beatmapset.ranked) + self.record = record + + def calculate_next_sync_time( + self, + ) -> timedelta | None: + if self.status.has_pp() or self.status == BeatmapRankStatus.LOVED: + return None + + now = utcnow() + if self.status == BeatmapRankStatus.QUALIFIED: + assert self.beatmapset.ranked_date is not None, "ranked_date should not be None for qualified maps" + time_to_ranked = (self.beatmapset.ranked_date + timedelta(days=7) - now).total_seconds() + baseline = max(MIN_DELTA, time_to_ranked / 2) + next_delta = max(MIN_DELTA, baseline) + elif self.status in {BeatmapRankStatus.WIP, BeatmapRankStatus.PENDING}: + seconds_since_update = (now - self.beatmapset.last_updated).total_seconds() + factor_update = max(1.0, seconds_since_update / TAU) + factor_play = 1.0 + math.log(1.0 + self.beatmapset.play_count) + status_factor = STATUS_FACTOR[self.status] + baseline = BASE * factor_play / factor_update * status_factor + next_delta = max(MIN_DELTA, baseline * (GROWTH ** (self.record.consecutive_no_change + 1))) + elif self.status == BeatmapRankStatus.GRAVEYARD: + days_since_update = (now - self.beatmapset.last_updated).days + doubling_periods = days_since_update / GRAVEYARD_DOUBLING_PERIOD_DAYS + delta = MIN_DELTA * (2**doubling_periods) + max_seconds = GRAVEYARD_MAX_DAYS * 86400 + next_delta = min(max_seconds, delta) + else: + next_delta = MIN_DELTA + jitter = timedelta(minutes=random.randint(JITTER_MIN, JITTER_MAX)) + return timedelta(seconds=next_delta) + jitter + + @property + def beatmapset_changed(self) -> bool: + return self.record.beatmap_status != BeatmapRankStatus(self.beatmapset.ranked) + + @property + def changed_beatmaps(self) -> list[ChangedBeatmap]: + changed_beatmaps = [] + for bm in self.beatmapset.beatmaps: + saved = next((s for s in self.record.beatmaps if s["beatmap_id"] == bm.id), None) + if not saved: + changed_beatmaps.append(ChangedBeatmap(bm.id, BeatmapChangeType.MAP_ADDED)) + elif saved["is_deleted"]: + changed_beatmaps.append(ChangedBeatmap(bm.id, BeatmapChangeType.MAP_ADDED)) + elif saved["md5"] != bm.checksum: + changed_beatmaps.append(ChangedBeatmap(bm.id, BeatmapChangeType.MAP_UPDATED)) + elif saved["beatmap_status"] != BeatmapRankStatus(bm.ranked): + changed_beatmaps.append(ChangedBeatmap(bm.id, BeatmapChangeType.STATUS_CHANGED)) + for saved in self.record.beatmaps: + if not any(bm.id == saved["beatmap_id"] for bm in self.beatmapset.beatmaps) and not saved["is_deleted"]: + changed_beatmaps.append(ChangedBeatmap(saved["beatmap_id"], BeatmapChangeType.MAP_DELETED)) + return changed_beatmaps + + +class BeatmapsetUpdateService: + def __init__(self, fetcher: "Fetcher"): + self.fetcher = fetcher + + async def add_missing_beatmapset(self, beatmapset_id: int) -> bool: + beatmapset = await self.fetcher.get_beatmapset(beatmapset_id) + await self.add(beatmapset) + status = BeatmapRankStatus(beatmapset.ranked) + if status.has_pp() or status == BeatmapRankStatus.LOVED: + return False + logger.opt(colors=True).debug( + f"[BeatmapsetUpdateService] added missing beatmapset {beatmapset_id} " + ) + return True + + async def add_missing_beatmapsets(self): + async with with_db() as session: + missings = await session.exec( + select(Beatmapset.id).where( + col(Beatmapset.beatmap_status).in_( + [ + BeatmapRankStatus.WIP, + BeatmapRankStatus.PENDING, + BeatmapRankStatus.GRAVEYARD, + BeatmapRankStatus.QUALIFIED, + ] + ), + col(Beatmapset.id).notin_(select(BeatmapSync.beatmapset_id)), + ) + ) + total = 0 + for missing in missings: + if await self.add_missing_beatmapset(missing): + total += 1 + if total > 0: + logger.opt(colors=True).info(f"[BeatmapsetUpdateService] added {total} missing beatmapset") + + async def add(self, beatmapset: BeatmapsetResp): + if ( + BeatmapRankStatus(beatmapset.ranked).has_pp() + or BeatmapRankStatus(beatmapset.ranked) == BeatmapRankStatus.LOVED + ): + return + async with with_db() as session: + sync_record = await session.get(BeatmapSync, beatmapset.id) + if not sync_record: + sync_record = BeatmapSync( + beatmapset_id=beatmapset.id, + beatmaps=[ + SavedBeatmapMeta( + beatmap_id=bm.id, + md5=bm.checksum, + is_deleted=False, + beatmap_status=BeatmapRankStatus(bm.ranked), + ) + for bm in beatmapset.beatmaps + ], + beatmap_status=BeatmapRankStatus(beatmapset.ranked), + ) + session.add(sync_record) + await session.commit() + await session.refresh(sync_record) + else: + sync_record.beatmaps = [ + SavedBeatmapMeta( + beatmap_id=bm.id, md5=bm.checksum, is_deleted=False, beatmap_status=BeatmapRankStatus(bm.ranked) + ) + for bm in beatmapset.beatmaps + ] + sync_record.beatmap_status = BeatmapRankStatus(beatmapset.ranked) + + processing = ProcessingBeatmapset(beatmapset, sync_record) + next_time_delta = processing.calculate_next_sync_time() + if not next_time_delta: + logger.opt(colors=True).info( + f"[BeatmapsetUpdateService] [{beatmapset.id}] " + "beatmapset has transformed to ranked or loved," + " removing from sync list" + ) + await session.delete(sync_record) + await session.commit() + return + sync_record.next_sync_time = utcnow() + next_time_delta + logger.opt(colors=True).info( + f"[BeatmapsetUpdateService] [{beatmapset.id}] next sync at {sync_record.next_sync_time}" + ) + await session.commit() + + async def _update_beatmaps(self): + async with with_db() as session: + logger.opt(colors=True).info("[BeatmapsetUpdateService] checking for beatmapset updates...") + now = utcnow() + records = await session.exec(select(BeatmapSync).where(BeatmapSync.next_sync_time <= now)) + for record in records: + logger.opt(colors=True).info( + f"[BeatmapsetUpdateService] [{record.beatmapset_id}] syncing..." + ) + try: + beatmapset = await self.fetcher.get_beatmapset(record.beatmapset_id) + except Exception as e: + if isinstance(e, HTTPError): + logger.warning( + f"[BeatmapsetUpdateService] [{record.beatmapset_id}] " + f"failed to fetch beatmapset: {e}, retrying later" + ) + else: + logger.exception( + f"[BeatmapsetUpdateService] [{record.beatmapset_id}] " + f"unexpected error: {e}, retrying later" + ) + record.next_sync_time = utcnow() + timedelta(seconds=MIN_DELTA) + continue + processing = ProcessingBeatmapset(beatmapset, record) + changed_beatmaps = processing.changed_beatmaps + changed = processing.beatmapset_changed or changed_beatmaps + if changed: + record.beatmaps = [ + SavedBeatmapMeta( + beatmap_id=bm.id, + md5=bm.checksum, + is_deleted=False, + beatmap_status=BeatmapRankStatus(bm.ranked), + ) + for bm in beatmapset.beatmaps + ] + record.beatmap_status = BeatmapRankStatus(beatmapset.ranked) + record.consecutive_no_change = 0 + + bg_tasks.add_task( + self._process_changed_beatmaps, + changed_beatmaps, + ) + bg_tasks.add_task( + self._process_changed_beatmapset, + beatmapset, + ) + else: + record.consecutive_no_change += 1 + + next_time_delta = processing.calculate_next_sync_time() + if not next_time_delta: + logger.opt(colors=True).info( + f"[BeatmapsetUpdateService] [{record.beatmapset_id}] beatmapset " + "has transformed to ranked or loved," + " removing from sync list" + ) + await session.delete(record) + else: + record.next_sync_time = utcnow() + next_time_delta + logger.opt(colors=True).info( + f"[BeatmapsetUpdateService] [{record.beatmapset_id}] " + f"next sync at {record.next_sync_time}" + ) + await session.commit() + + async def _process_changed_beatmapset(self, beatmapset: BeatmapsetResp): + async with with_db() as session: + db_beatmapset = await session.get(Beatmapset, beatmapset.id) + new_beatmapset = await Beatmapset.from_resp_no_save(session, beatmapset) + if db_beatmapset: + await session.merge(new_beatmapset) + await session.commit() + + async def _process_changed_beatmaps(self, changed: list[ChangedBeatmap]): + storage_service = get_storage_service() + async with with_db() as session: + + async def _process_update_or_delete_beatmaps(beatmap_id: int): + scores = await session.exec(select(Score).where(Score.beatmap_id == beatmap_id)) + total = 0 + for score in scores: + if settings.old_score_processing_mode == OldScoreProcessingMode.STRICT: + await score.delete(session, storage_service) + elif settings.old_score_processing_mode == OldScoreProcessingMode.NORMAL: + if await score.awaitable_attrs.best_score: + assert score.best_score is not None + await score.best_score.delete(session) + if await score.awaitable_attrs.ranked_score: + assert score.ranked_score is not None + await score.ranked_score.delete(session) + total += 1 + if total > 0: + logger.opt(colors=True).info( + f"[BeatmapsetUpdateService] [beatmap: {beatmap_id}] processed {total} old scores" + ) + await session.commit() + + for change in changed: + if change.type == BeatmapChangeType.MAP_ADDED: + try: + beatmap = await self.fetcher.get_beatmap(change.beatmap_id) + except Exception as e: + logger.opt(colors=True).error( + f"[BeatmapsetUpdateService] [beatmap: {change.beatmap_id}] " + f"failed to fetch added beatmap: {e}, skipping" + ) + continue + logger.opt(colors=True).info( + f"[BeatmapsetUpdateService] [{beatmap.beatmapset_id}] adding beatmap {beatmap.id}" + ) + await Beatmap.from_resp_no_save(session, beatmap) + else: + try: + beatmap = await self.fetcher.get_beatmap(change.beatmap_id) + except Exception as e: + logger.opt(colors=True).error( + f"[BeatmapsetUpdateService] [beatmap: {change.beatmap_id}] " + f"failed to fetch changed beatmap: {e}, skipping" + ) + continue + logger.opt(colors=True).info( + f"[BeatmapsetUpdateService] [{beatmap.beatmapset_id}] processing beatmap " + f"{beatmap.id} change {change.type}" + ) + new_db_beatmap = await Beatmap.from_resp_no_save(session, beatmap) + existing_beatmap = await session.get(Beatmap, change.beatmap_id) + if existing_beatmap: + await session.merge(new_db_beatmap) + await session.commit() + if change.type != BeatmapChangeType.STATUS_CHANGED: + await _process_update_or_delete_beatmaps(change.beatmap_id) + + +service: BeatmapsetUpdateService | None = None + + +def init_beatmapset_update_service(fetcher: "Fetcher") -> BeatmapsetUpdateService: + global service + if service is None: + service = BeatmapsetUpdateService(fetcher) + bg_tasks.add_task(service.add_missing_beatmapsets) + return service + + +def get_beatmapset_update_service() -> BeatmapsetUpdateService: + assert service is not None, "BeatmapsetUpdateService is not initialized" + return service + + +@get_scheduler().scheduled_job( + "interval", + id="update_beatmaps", + minutes=SCHEDULER_INTERVAL_MINUTES, + next_run_time=datetime.now() + timedelta(minutes=1), +) +async def beatmapset_update_job(): + if service is not None: + bg_tasks.add_task(service.add_missing_beatmapsets) + await service._update_beatmaps() diff --git a/app/service/user_cache_service.py b/app/service/user_cache_service.py index c9d5856..f95bcfc 100644 --- a/app/service/user_cache_service.py +++ b/app/service/user_cache_service.py @@ -14,6 +14,7 @@ from app.const import BANCHOBOT_ID from app.database import User, UserResp from app.database.lazer_user import SEARCH_INCLUDED from app.database.score import LegacyScoreResp, ScoreResp +from app.dependencies.database import with_db from app.log import logger from app.models.score import GameMode from app.service.asset_proxy_service import get_asset_proxy_service @@ -382,3 +383,14 @@ def get_user_cache_service(redis: Redis) -> UserCacheService: if _user_cache_service is None: _user_cache_service = UserCacheService(redis) return _user_cache_service + + +async def refresh_user_cache_background(redis: Redis, user_id: int, mode: GameMode): + """后台任务:刷新用户缓存""" + try: + user_cache_service = get_user_cache_service(redis) + # 创建独立的数据库会话 + async with with_db() as session: + await user_cache_service.refresh_user_cache_on_score_submit(session, user_id, mode) + except Exception as e: + logger.error(f"Failed to refresh user cache after score submit: {e}") diff --git a/app/signalr/hub/spectator.py b/app/signalr/hub/spectator.py index bc5acd5..05fbc42 100644 --- a/app/signalr/hub/spectator.py +++ b/app/signalr/hub/spectator.py @@ -134,7 +134,7 @@ async def save_replay( data.extend(compressed) storage_service = get_storage_service() - replay_path = f"replays/{score.id}_{score.beatmap_id}_{score.user_id}_lazer_replay.osr" + replay_path = score.replay_filename await storage_service.write_file(replay_path, bytes(data), "application/x-osu-replay") diff --git a/main.py b/main.py index 36700b0..19cbde7 100644 --- a/main.py +++ b/main.py @@ -30,6 +30,7 @@ from app.scheduler.database_cleanup_scheduler import ( stop_database_cleanup_scheduler, ) from app.service.beatmap_download_service import download_service +from app.service.beatmapset_update_service import init_beatmapset_update_service from app.service.calculate_all_user_rank import calculate_user_rank from app.service.create_banchobot import create_banchobot from app.service.daily_challenge import daily_challenge_job, process_daily_challenge_top @@ -55,7 +56,7 @@ async def lifespan(app: FastAPI): init_mods() init_ranked_mods() await FastAPILimiter.init(get_redis()) - await get_fetcher() # 初始化 fetcher + fetcher = await get_fetcher() # 初始化 fetcher await init_geoip() # 初始化 GeoIP 数据库 await create_rx_statistics() await calculate_user_rank(True) @@ -68,6 +69,7 @@ async def lifespan(app: FastAPI): await download_service.start_health_check() # 启动下载服务健康检查 await start_cache_scheduler() # 启动缓存调度器 await start_database_cleanup_scheduler() # 启动数据库清理调度器 + init_beatmapset_update_service(fetcher) # 初始化谱面集更新服务 redis_message_system.start() # 启动 Redis 消息系统 load_achievements() diff --git a/migrations/versions/2025-09-30_dc2087561edf_score_save_ranked_into_database.py b/migrations/versions/2025-09-30_dc2087561edf_score_save_ranked_into_database.py new file mode 100644 index 0000000..cba3022 --- /dev/null +++ b/migrations/versions/2025-09-30_dc2087561edf_score_save_ranked_into_database.py @@ -0,0 +1,61 @@ +"""score: save ranked into database + +Revision ID: dc2087561edf +Revises: 9419272e4c85 +Create Date: 2025-09-30 10:44:25.286498 + +""" + +from __future__ import annotations + +from collections.abc import Sequence + +from alembic import op +import sqlalchemy as sa + +# revision identifiers, used by Alembic. +revision: str = "dc2087561edf" +down_revision: str | Sequence[str] | None = "9419272e4c85" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + """Upgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.add_column("scores", sa.Column("ranked", sa.Boolean(), nullable=True)) + op.execute("UPDATE scores SET ranked = pp > 0") + op.alter_column("scores", "ranked", nullable=False, type_=sa.Boolean(), existing_type=sa.Boolean()) + + op.drop_constraint(op.f("best_scores_ibfk_2"), "best_scores", type_="foreignkey") + op.create_foreign_key(op.f("best_scores_ibfk_2"), "best_scores", "scores", ["score_id"], ["id"], ondelete="CASCADE") + op.drop_constraint(op.f("playlist_best_scores_ibfk_3"), "playlist_best_scores", type_="foreignkey") + op.create_foreign_key( + op.f("playlist_best_scores_ibfk_3"), "playlist_best_scores", "scores", ["score_id"], ["id"], ondelete="CASCADE" + ) + op.drop_constraint(op.f("total_score_best_scores_ibfk_2"), "total_score_best_scores", type_="foreignkey") + op.create_foreign_key( + op.f("total_score_best_scores_ibfk_2"), + "total_score_best_scores", + "scores", + ["score_id"], + ["id"], + ondelete="CASCADE", + ) + # ### end Alembic commands ### + + +def downgrade() -> None: + """Downgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column("scores", "ranked") + + op.drop_constraint(op.f("best_scores_ibfk_2"), "best_scores", type_="foreignkey") + op.create_foreign_key(op.f("best_scores_ibfk_2"), "best_scores", "scores", ["score_id"], ["id"]) + op.drop_constraint(op.f("playlist_best_scores_ibfk_3"), "playlist_best_scores", type_="foreignkey") + op.create_foreign_key(op.f("playlist_best_scores_ibfk_3"), "playlist_best_scores", "scores", ["score_id"], ["id"]) + op.drop_constraint(op.f("total_score_best_scores_ibfk_2"), "total_score_best_scores", type_="foreignkey") + op.create_foreign_key( + op.f("total_score_best_scores_ibfk_2"), "total_score_best_scores", "scores", ["score_id"], ["id"] + ) + # ### end Alembic commands ### diff --git a/migrations/versions/2025-10-01_2885978490dc_sync_add_beatmap_sync_table.py b/migrations/versions/2025-10-01_2885978490dc_sync_add_beatmap_sync_table.py new file mode 100644 index 0000000..3e0ea13 --- /dev/null +++ b/migrations/versions/2025-10-01_2885978490dc_sync_add_beatmap_sync_table.py @@ -0,0 +1,54 @@ +"""sync: add beatmap sync table + +Revision ID: 2885978490dc +Revises: dc2087561edf +Create Date: 2025-10-01 12:19:50.485318 + +""" + +from __future__ import annotations + +from collections.abc import Sequence + +from alembic import op +import sqlalchemy as sa + +# revision identifiers, used by Alembic. +revision: str = "2885978490dc" +down_revision: str | Sequence[str] | None = "dc2087561edf" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + """Upgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.create_table( + "beatmapsync", + sa.Column("beatmapset_id", sa.Integer(), nullable=False), + sa.Column("beatmaps", sa.JSON(), nullable=True), + sa.Column( + "beatmap_status", + sa.Enum( + "GRAVEYARD", "WIP", "PENDING", "RANKED", "APPROVED", "QUALIFIED", "LOVED", name="beatmaprankstatus" + ), + nullable=False, + ), + sa.Column("consecutive_no_change", sa.Integer(), nullable=False), + sa.Column("next_sync_time", sa.DateTime(), nullable=True), + sa.ForeignKeyConstraint( + ["beatmapset_id"], + ["beatmapsets.id"], + ), + sa.PrimaryKeyConstraint("beatmapset_id"), + ) + op.create_index(op.f("ix_beatmapsync_beatmap_status"), "beatmapsync", ["beatmap_status"], unique=False) + op.create_index(op.f("ix_beatmapsync_next_sync_time"), "beatmapsync", ["next_sync_time"], unique=False) + # ### end Alembic commands ### + + +def downgrade() -> None: + """Downgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.drop_table("beatmapsync") + # ### end Alembic commands ### diff --git a/migrations/versions/2025-10-01_b1ac2154bd0d_sync_add_updated_at.py b/migrations/versions/2025-10-01_b1ac2154bd0d_sync_add_updated_at.py new file mode 100644 index 0000000..679a8bb --- /dev/null +++ b/migrations/versions/2025-10-01_b1ac2154bd0d_sync_add_updated_at.py @@ -0,0 +1,35 @@ +"""sync: add updated_at + +Revision ID: b1ac2154bd0d +Revises: 2885978490dc +Create Date: 2025-10-01 14:56:08.539694 + +""" + +from __future__ import annotations + +from collections.abc import Sequence + +from alembic import op +import sqlalchemy as sa +import sqlmodel + +# revision identifiers, used by Alembic. +revision: str = "b1ac2154bd0d" +down_revision: str | Sequence[str] | None = "2885978490dc" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + """Upgrade schema.""" + op.add_column("beatmapsync", sa.Column("updated_at", sa.DateTime(), nullable=True)) + op.execute(sqlmodel.text("UPDATE beatmapsync SET updated_at = NOW() WHERE updated_at IS NULL")) + op.alter_column("beatmapsync", "updated_at", nullable=False, type_=sa.DateTime()) + op.create_index(op.f("ix_beatmapsync_updated_at"), "beatmapsync", ["updated_at"], unique=False) + + +def downgrade() -> None: + """Downgrade schema.""" + op.drop_index(op.f("ix_beatmapsync_updated_at"), table_name="beatmapsync") + op.drop_column("beatmapsync", "updated_at")