feat(v1-api): support api v1

This commit is contained in:
MingxuanGame
2025-08-14 09:11:53 +00:00
parent c98ea9c723
commit a8906b8194
15 changed files with 802 additions and 18 deletions

171
app/router/v1/score.py Normal file
View File

@@ -0,0 +1,171 @@
from __future__ import annotations
from datetime import UTC, datetime, timedelta
from typing import Literal
from app.database.pp_best_score import PPBestScore
from app.database.score import Score, get_leaderboard
from app.dependencies.database import get_db
from app.models.mods import int_to_mods, mods_to_int
from app.models.score import INT_TO_MODE, LeaderboardType
from .router import AllStrModel, router
from fastapi import Depends, Query
from sqlalchemy.orm import joinedload
from sqlmodel import col, exists, select
from sqlmodel.ext.asyncio.session import AsyncSession
class V1Score(AllStrModel):
beatmap_id: int | None = None
username: str | None = None
score_id: int
score: int
maxcombo: int | None = None
count50: int
count100: int
count300: int
countmiss: int
countkatu: int
countgeki: int
perfect: bool
enabled_mods: int
user_id: int
date: datetime
rank: str
pp: float
replay_available: bool
@classmethod
async def from_db(cls, score: Score):
return cls(
beatmap_id=score.beatmap_id,
username=score.user.username,
score_id=score.id,
score=score.total_score,
maxcombo=score.max_combo,
count50=score.n50,
count100=score.n100,
count300=score.n300,
countmiss=score.nmiss,
countkatu=score.nkatu,
countgeki=score.ngeki,
perfect=score.is_perfect_combo,
enabled_mods=mods_to_int(score.mods),
user_id=score.user_id,
date=score.ended_at,
rank=score.rank,
pp=score.pp,
replay_available=score.has_replay,
)
@router.get(
"/get_user_best",
response_model=list[V1Score],
name="获取用户最好成绩",
description="获取指定用户的最好成绩。",
)
async def get_user_best(
user: str = Query(..., alias="u", description="用户"),
ruleset_id: int = Query(0, alias="m", description="Ruleset ID", ge=0, le=3),
type: Literal["string", "id"] | None = Query(
None, description="用户类型string 用户名称 / id 用户 ID"
),
limit: int = Query(10, ge=1, le=100, description="返回的成绩数量"),
session: AsyncSession = Depends(get_db),
):
scores = (
await session.exec(
select(Score)
.where(
Score.user_id == user
if type == "id" or user.isdigit()
else Score.user.username == user,
Score.gamemode == INT_TO_MODE[ruleset_id],
exists().where(col(PPBestScore.score_id) == Score.id),
)
.order_by(col(Score.pp).desc())
.options(joinedload(Score.beatmap))
.limit(limit)
)
).all()
return [await V1Score.from_db(score) for score in scores]
@router.get(
"/get_user_recent",
response_model=list[V1Score],
name="获取用户最近成绩",
description="获取指定用户的最近成绩。",
)
async def get_user_recent(
user: str = Query(..., alias="u", description="用户"),
ruleset_id: int = Query(0, alias="m", description="Ruleset ID", ge=0, le=3),
type: Literal["string", "id"] | None = Query(
None, description="用户类型string 用户名称 / id 用户 ID"
),
limit: int = Query(10, ge=1, le=100, description="返回的成绩数量"),
session: AsyncSession = Depends(get_db),
):
scores = (
await session.exec(
select(Score)
.where(
Score.user_id == user
if type == "id" or user.isdigit()
else Score.user.username == user,
Score.gamemode == INT_TO_MODE[ruleset_id],
Score.ended_at > datetime.now(UTC) - timedelta(hours=24),
)
.order_by(col(Score.pp).desc())
.options(joinedload(Score.beatmap))
.limit(limit)
)
).all()
return [await V1Score.from_db(score) for score in scores]
@router.get(
"/get_scores",
response_model=list[V1Score],
name="获取成绩",
description="获取指定谱面的成绩。",
)
async def get_scores(
user: str | None = Query(None, alias="u", description="用户"),
beatmap_id: int = Query(alias="b", description="谱面 ID"),
ruleset_id: int = Query(0, alias="m", description="Ruleset ID", ge=0, le=3),
type: Literal["string", "id"] | None = Query(
None, description="用户类型string 用户名称 / id 用户 ID"
),
limit: int = Query(10, ge=1, le=100, description="返回的成绩数量"),
mods: int = Query(0, description="成绩的 MOD"),
session: AsyncSession = Depends(get_db),
):
if user is not None:
scores = (
await session.exec(
select(Score)
.where(
Score.gamemode == INT_TO_MODE[ruleset_id],
Score.beatmap_id == beatmap_id,
Score.user_id == user
if type == "id" or user.isdigit()
else Score.user.username == user,
)
.options(joinedload(Score.beatmap))
.order_by(col(Score.classic_total_score).desc())
)
).all()
else:
scores, _ = await get_leaderboard(
session,
beatmap_id,
INT_TO_MODE[ruleset_id],
LeaderboardType.GLOBAL,
[mod["acronym"] for mod in int_to_mods(mods)],
limit=limit,
)
return [await V1Score.from_db(score) for score in scores]