refactor(score): replace MODE_TO_INT INT_TO_MODE with int(mode) GameMode.from_int

This commit is contained in:
MingxuanGame
2025-08-14 13:04:59 +00:00
parent a8906b8194
commit 46a1d049fe
11 changed files with 153 additions and 117 deletions

View File

@@ -7,7 +7,7 @@ from app.calculator import calculate_beatmap_attribute
from app.config import settings
from app.models.beatmap import BeatmapAttributes, BeatmapRankStatus
from app.models.mods import APIMod
from app.models.score import MODE_TO_INT, GameMode
from app.models.score import GameMode
from .beatmap_playcounts import BeatmapPlaycounts
from .beatmapset import Beatmapset, BeatmapsetResp
@@ -176,7 +176,7 @@ class BeatmapResp(BeatmapBase):
else:
beatmap_["status"] = beatmap_status.name.lower()
beatmap_["ranked"] = beatmap_status.value
beatmap_["mode_int"] = MODE_TO_INT[beatmap.mode]
beatmap_["mode_int"] = int(beatmap.mode)
if not from_set:
beatmap_["beatmapset"] = await BeatmapsetResp.from_db(
beatmap.beatmapset, session=session, user=user

View File

@@ -24,8 +24,6 @@ from app.models.model import (
)
from app.models.mods import APIMod, mods_can_get_pp
from app.models.score import (
INT_TO_MODE,
MODE_TO_INT,
GameMode,
HitResult,
LeaderboardType,
@@ -189,7 +187,7 @@ class ScoreResp(ScoreBase):
)
s.is_perfect_combo = s.max_combo == s.beatmap.max_combo
s.legacy_perfect = s.max_combo == s.beatmap.max_combo
s.ruleset_id = MODE_TO_INT[score.gamemode]
s.ruleset_id = int(score.gamemode)
best_id = await get_best_id(session, score.id)
if best_id:
s.best_id = best_id
@@ -728,7 +726,7 @@ async def process_score(
acronyms = [mod["acronym"] for mod in info.mods]
is_rx = "RX" in acronyms
is_ap = "AP" in acronyms
gamemode = INT_TO_MODE[info.ruleset_id]
gamemode = GameMode.from_int(info.ruleset_id)
if settings.enable_osu_rx and is_rx and gamemode == GameMode.OSU:
gamemode = GameMode.OSURX
elif settings.enable_osu_ap and is_ap and gamemode == GameMode.OSU:

View File

@@ -31,17 +31,35 @@ class GameMode(str, Enum):
GameMode.OSUAP: rosu.GameMode.Osu,
}[self]
def __int__(self) -> int:
return {
GameMode.OSU: 0,
GameMode.TAIKO: 1,
GameMode.FRUITS: 2,
GameMode.MANIA: 3,
GameMode.OSURX: 0,
GameMode.OSUAP: 0,
}[self]
MODE_TO_INT = {
GameMode.OSU: 0,
GameMode.TAIKO: 1,
GameMode.FRUITS: 2,
GameMode.MANIA: 3,
GameMode.OSURX: 0,
GameMode.OSUAP: 0,
}
INT_TO_MODE = {v: k for k, v in MODE_TO_INT.items()}
INT_TO_MODE[0] = GameMode.OSU
@classmethod
def from_int(cls, v: int) -> "GameMode":
return {
0: GameMode.OSU,
1: GameMode.TAIKO,
2: GameMode.FRUITS,
3: GameMode.MANIA,
}[v]
@classmethod
def from_int_extra(cls, v: int) -> "GameMode":
return {
0: GameMode.OSU,
1: GameMode.TAIKO,
2: GameMode.FRUITS,
3: GameMode.MANIA,
4: GameMode.OSURX,
5: GameMode.OSUAP,
}[v]
class Rank(str, Enum):

View File

@@ -13,7 +13,7 @@ from app.dependencies.fetcher import get_fetcher
from app.fetcher import Fetcher
from app.models.beatmap import BeatmapRankStatus, Genre, Language
from app.models.mods import int_to_mods
from app.models.score import MODE_TO_INT, GameMode
from app.models.score import GameMode
from .router import AllStrModel, router
@@ -100,7 +100,7 @@ class V1Beatmap(AllStrModel):
total_length=db_beatmap.total_length,
version=db_beatmap.version,
file_md5=db_beatmap.checksum,
mode=MODE_TO_INT[db_beatmap.mode],
mode=int(db_beatmap.mode),
tags=db_beatmap.beatmapset.tags,
favourite_count=(
await session.exec(

View File

@@ -9,7 +9,7 @@ from app.database.score import Score
from app.dependencies.database import get_db
from app.dependencies.storage import get_storage_service
from app.models.mods import int_to_mods
from app.models.score import INT_TO_MODE
from app.models.score import GameMode
from app.storage import StorageService
from .router import router
@@ -35,7 +35,10 @@ async def download_replay(
beatmap: int = Query(..., alias="b", description="谱面 ID"),
user: str = Query(..., alias="u", description="用户"),
ruleset_id: int | None = Query(
None, alias="m", description="Ruleset ID", ge=0, le=3
None,
alias="m",
description="Ruleset ID",
ge=0,
),
score_id: int | None = Query(None, alias="s", description="成绩 ID"),
type: Literal["string", "id"] | None = Query(
@@ -51,22 +54,25 @@ async def download_replay(
if score_record is None:
raise HTTPException(status_code=404, detail="Score not found")
else:
score_record = (
await session.exec(
select(Score).where(
Score.beatmap_id == beatmap,
Score.user_id == user
if type == "id" or user.isdigit()
else Score.user.username == user,
Score.mods == mods_,
Score.gamemode == INT_TO_MODE[ruleset_id]
if ruleset_id is not None
else True,
try:
score_record = (
await session.exec(
select(Score).where(
Score.beatmap_id == beatmap,
Score.user_id == user
if type == "id" or user.isdigit()
else Score.user.username == user,
Score.mods == mods_,
Score.gamemode == GameMode.from_int_extra(ruleset_id)
if ruleset_id is not None
else True,
)
)
)
).first()
if score_record is None:
raise HTTPException(status_code=404, detail="Score not found")
).first()
if score_record is None:
raise HTTPException(status_code=404, detail="Score not found")
except KeyError:
raise HTTPException(status_code=400, detail="Invalid request")
filepath = (
f"replays/{score_record.id}_{score_record.beatmap_id}"

View File

@@ -22,4 +22,6 @@ class AllStrModel(BaseModel):
return v.strftime("%Y-%m-%d %H:%M:%S")
elif isinstance(v, bool):
return "1" if v else "0"
elif isinstance(v, list):
return [self.serialize_datetime(item, _info) for item in v]
return str(v)

View File

@@ -7,11 +7,11 @@ from app.database.pp_best_score import PPBestScore
from app.database.score import Score, get_leaderboard
from app.dependencies.database import get_db
from app.models.mods import int_to_mods, mods_to_int
from app.models.score import INT_TO_MODE, LeaderboardType
from app.models.score import GameMode, LeaderboardType
from .router import AllStrModel, router
from fastapi import Depends, Query
from fastapi import Depends, HTTPException, Query
from sqlalchemy.orm import joinedload
from sqlmodel import col, exists, select
from sqlmodel.ext.asyncio.session import AsyncSession
@@ -69,29 +69,32 @@ class V1Score(AllStrModel):
)
async def get_user_best(
user: str = Query(..., alias="u", description="用户"),
ruleset_id: int = Query(0, alias="m", description="Ruleset ID", ge=0, le=3),
ruleset_id: int = Query(0, alias="m", description="Ruleset ID", ge=0),
type: Literal["string", "id"] | None = Query(
None, description="用户类型string 用户名称 / id 用户 ID"
),
limit: int = Query(10, ge=1, le=100, description="返回的成绩数量"),
session: AsyncSession = Depends(get_db),
):
scores = (
await session.exec(
select(Score)
.where(
Score.user_id == user
if type == "id" or user.isdigit()
else Score.user.username == user,
Score.gamemode == INT_TO_MODE[ruleset_id],
exists().where(col(PPBestScore.score_id) == Score.id),
try:
scores = (
await session.exec(
select(Score)
.where(
Score.user_id == user
if type == "id" or user.isdigit()
else Score.user.username == user,
Score.gamemode == GameMode.from_int_extra(ruleset_id),
exists().where(col(PPBestScore.score_id) == Score.id),
)
.order_by(col(Score.pp).desc())
.options(joinedload(Score.beatmap))
.limit(limit)
)
.order_by(col(Score.pp).desc())
.options(joinedload(Score.beatmap))
.limit(limit)
)
).all()
return [await V1Score.from_db(score) for score in scores]
).all()
return [await V1Score.from_db(score) for score in scores]
except KeyError:
raise HTTPException(400, "Invalid request")
@router.get(
@@ -102,29 +105,32 @@ async def get_user_best(
)
async def get_user_recent(
user: str = Query(..., alias="u", description="用户"),
ruleset_id: int = Query(0, alias="m", description="Ruleset ID", ge=0, le=3),
ruleset_id: int = Query(0, alias="m", description="Ruleset ID", ge=0),
type: Literal["string", "id"] | None = Query(
None, description="用户类型string 用户名称 / id 用户 ID"
),
limit: int = Query(10, ge=1, le=100, description="返回的成绩数量"),
session: AsyncSession = Depends(get_db),
):
scores = (
await session.exec(
select(Score)
.where(
Score.user_id == user
if type == "id" or user.isdigit()
else Score.user.username == user,
Score.gamemode == INT_TO_MODE[ruleset_id],
Score.ended_at > datetime.now(UTC) - timedelta(hours=24),
try:
scores = (
await session.exec(
select(Score)
.where(
Score.user_id == user
if type == "id" or user.isdigit()
else Score.user.username == user,
Score.gamemode == GameMode.from_int_extra(ruleset_id),
Score.ended_at > datetime.now(UTC) - timedelta(hours=24),
)
.order_by(col(Score.pp).desc())
.options(joinedload(Score.beatmap))
.limit(limit)
)
.order_by(col(Score.pp).desc())
.options(joinedload(Score.beatmap))
.limit(limit)
)
).all()
return [await V1Score.from_db(score) for score in scores]
).all()
return [await V1Score.from_db(score) for score in scores]
except KeyError:
raise HTTPException(400, "Invalid request")
@router.get(
@@ -136,7 +142,7 @@ async def get_user_recent(
async def get_scores(
user: str | None = Query(None, alias="u", description="用户"),
beatmap_id: int = Query(alias="b", description="谱面 ID"),
ruleset_id: int = Query(0, alias="m", description="Ruleset ID", ge=0, le=3),
ruleset_id: int = Query(0, alias="m", description="Ruleset ID", ge=0),
type: Literal["string", "id"] | None = Query(
None, description="用户类型string 用户名称 / id 用户 ID"
),
@@ -144,28 +150,31 @@ async def get_scores(
mods: int = Query(0, description="成绩的 MOD"),
session: AsyncSession = Depends(get_db),
):
if user is not None:
scores = (
await session.exec(
select(Score)
.where(
Score.gamemode == INT_TO_MODE[ruleset_id],
Score.beatmap_id == beatmap_id,
Score.user_id == user
if type == "id" or user.isdigit()
else Score.user.username == user,
try:
if user is not None:
scores = (
await session.exec(
select(Score)
.where(
Score.gamemode == GameMode.from_int_extra(ruleset_id),
Score.beatmap_id == beatmap_id,
Score.user_id == user
if type == "id" or user.isdigit()
else Score.user.username == user,
)
.options(joinedload(Score.beatmap))
.order_by(col(Score.classic_total_score).desc())
)
.options(joinedload(Score.beatmap))
.order_by(col(Score.classic_total_score).desc())
).all()
else:
scores, _ = await get_leaderboard(
session,
beatmap_id,
GameMode.from_int_extra(ruleset_id),
LeaderboardType.GLOBAL,
[mod["acronym"] for mod in int_to_mods(mods)],
limit=limit,
)
).all()
else:
scores, _ = await get_leaderboard(
session,
beatmap_id,
INT_TO_MODE[ruleset_id],
LeaderboardType.GLOBAL,
[mod["acronym"] for mod in int_to_mods(mods)],
limit=limit,
)
return [await V1Score.from_db(score) for score in scores]
return [await V1Score.from_db(score) for score in scores]
except KeyError:
raise HTTPException(400, "Invalid request")

View File

@@ -6,11 +6,11 @@ from typing import Literal
from app.database.lazer_user import User
from app.database.statistics import UserStatistics, UserStatisticsResp
from app.dependencies.database import get_db
from app.models.score import INT_TO_MODE, GameMode
from app.models.score import GameMode
from .router import AllStrModel, router
from fastapi import Depends, Query
from fastapi import Depends, HTTPException, Query
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
@@ -65,7 +65,9 @@ class V1User(AllStrModel):
playcount=statistics.play_count if statistics else 0,
ranked_score=statistics.ranked_score if statistics else 0,
total_score=statistics.total_score if statistics else 0,
pp_rank=statistics.global_rank if statistics else 0,
pp_rank=statistics.global_rank
if statistics and statistics.global_rank
else 0,
level=current_statistics.level_current if current_statistics else 0,
pp_raw=statistics.pp if statistics else 0.0,
accuracy=statistics.hit_accuracy if statistics else 0,
@@ -76,7 +78,9 @@ class V1User(AllStrModel):
count_rank_a=current_statistics.grade_a if current_statistics else 0,
country=db_user.country_code,
total_seconds_played=statistics.play_time if statistics else 0,
pp_country_rank=statistics.country_rank if statistics else 0,
pp_country_rank=statistics.country_rank
if statistics and statistics.country_rank
else 0,
events=[], # TODO
)
@@ -89,9 +93,7 @@ class V1User(AllStrModel):
)
async def get_user(
user: str = Query(..., alias="u", description="用户"),
ruleset_id: int | None = Query(
None, alias="m", description="Ruleset ID", ge=0, le=3
),
ruleset_id: int | None = Query(None, alias="m", description="Ruleset ID", ge=0),
type: Literal["string", "id"] | None = Query(
None, description="用户类型string 用户名称 / id 用户 ID"
),
@@ -111,8 +113,13 @@ async def get_user(
).first()
if not db_user:
return []
return [
await V1User.from_db(
session, db_user, INT_TO_MODE[ruleset_id] if ruleset_id else None
)
]
try:
return [
await V1User.from_db(
session,
db_user,
GameMode.from_int_extra(ruleset_id) if ruleset_id else None,
)
]
except KeyError:
raise HTTPException(400, "Invalid request")

View File

@@ -13,7 +13,6 @@ from app.fetcher import Fetcher
from app.models.beatmap import BeatmapAttributes
from app.models.mods import APIMod, int_to_mods
from app.models.score import (
INT_TO_MODE,
GameMode,
)
@@ -168,7 +167,7 @@ async def get_beatmap_attributes(
default=None, description="指定 ruleset为空则使用谱面自身模式"
),
ruleset_id: int | None = Query(
default=None, description="以数字指定 ruleset (与 ruleset 二选一)"
default=None, description="以数字指定 ruleset (与 ruleset 二选一)", ge=0, le=3
),
redis: Redis = Depends(get_redis),
db: AsyncSession = Depends(get_db),
@@ -185,7 +184,7 @@ async def get_beatmap_attributes(
mods_.append(APIMod(acronym=i, settings={}))
mods_.sort(key=lambda x: x["acronym"])
if ruleset_id is not None and ruleset is None:
ruleset = INT_TO_MODE[ruleset_id]
ruleset = GameMode.from_int(ruleset_id)
if ruleset is None:
beatmap_db = await Beatmap.get_or_fetch(db, fetcher, beatmap_id)
ruleset = beatmap_db.mode

View File

@@ -37,7 +37,6 @@ from app.dependencies.user import get_client_user, get_current_user
from app.fetcher import Fetcher
from app.models.room import RoomCategory
from app.models.score import (
INT_TO_MODE,
GameMode,
LeaderboardType,
Rank,
@@ -187,7 +186,7 @@ async def get_user_beatmap_score(
beatmap_id: int = Path(description="谱面 ID"),
user_id: int = Path(description="用户 ID"),
legacy_only: bool = Query(None, description="是否只查询 Stable 分数"),
mode: str = Query(None, description="指定 ruleset (可选)"),
mode: GameMode | None = Query(None, description="指定 ruleset (可选)"),
mods: str = Query(None, description="筛选使用的 Mods (暂未实现)"),
current_user: User = Security(get_current_user, scopes=["public"]),
db: AsyncSession = Depends(get_db),
@@ -232,7 +231,7 @@ async def get_user_all_beatmap_scores(
beatmap_id: int = Path(description="谱面 ID"),
user_id: int = Path(description="用户 ID"),
legacy_only: bool = Query(None, description="是否只查询 Stable 分数"),
ruleset: str = Query(None, description="指定 ruleset (可选)"),
ruleset: GameMode | None = Query(None, description="指定 ruleset (可选)"),
current_user: User = Security(get_current_user, scopes=["public"]),
db: AsyncSession = Depends(get_db),
):
@@ -275,7 +274,7 @@ async def create_solo_score(
score_token = ScoreToken(
user_id=current_user.id,
beatmap_id=beatmap_id,
ruleset_id=INT_TO_MODE[ruleset_id],
ruleset_id=GameMode.from_int(ruleset_id),
)
db.add(score_token)
await db.commit()
@@ -370,7 +369,7 @@ async def create_playlist_score(
score_token = ScoreToken(
user_id=current_user.id,
beatmap_id=beatmap_id,
ruleset_id=INT_TO_MODE[ruleset_id],
ruleset_id=GameMode.from_int(ruleset_id),
playlist_item_id=playlist_id,
)
session.add(score_token)

View File

@@ -19,7 +19,7 @@ from app.dependencies.fetcher import get_fetcher
from app.fetcher import Fetcher
from app.log import logger
from app.models.mods import mods_can_get_pp
from app.models.score import MODE_TO_INT, GameMode
from app.models.score import GameMode
from httpx import HTTPError
from redis.asyncio import Redis
@@ -80,9 +80,7 @@ async def _recalculate_pp(
await asyncio.sleep(2)
continue
ranked = db_beatmap.beatmap_status.has_pp() | settings.enable_all_beatmap_pp
if not ranked or not mods_can_get_pp(
MODE_TO_INT[score.gamemode], score.mods
):
if not ranked or not mods_can_get_pp(int(score.gamemode), score.mods):
score.pp = 0
break
try: