feat(beatmap): implement get beatmap arrtibutes

This commit is contained in:
MingxuanGame
2025-07-26 17:31:23 +00:00
parent ef977d1c2d
commit 900a17f815
16 changed files with 425 additions and 171 deletions

View File

@@ -5,6 +5,7 @@ from . import ( # pyright: ignore[reportUnusedImport] # noqa: F401
beatmapset,
me,
relationship,
score,
)
from .api_router import router as api_router
from .auth import router as auth_router

View File

@@ -1,22 +1,31 @@
from __future__ import annotations
import asyncio
import hashlib
import json
from app.database import (
Beatmap,
BeatmapResp,
User as DBUser,
)
from app.database.beatmapset import Beatmapset
from app.database.score import Score, ScoreResp
from app.dependencies.database import get_db
from app.dependencies.database import get_db, get_redis
from app.dependencies.fetcher import get_fetcher
from app.dependencies.user import get_current_user
from app.fetcher import Fetcher
from app.models.beatmap import BeatmapAttributes
from app.models.mods import APIMod, int_to_mods
from app.models.score import INT_TO_MODE, GameMode
from app.utils import calculate_beatmap_attribute
from .api_router import router
from fastapi import Depends, HTTPException, Query
from httpx import HTTPStatusError
from httpx import HTTPError, HTTPStatusError
from pydantic import BaseModel
from redis import Redis
import rosu_pp_py as rosu
from sqlalchemy.orm import joinedload
from sqlmodel import col, select
from sqlmodel.ext.asyncio.session import AsyncSession
@@ -29,32 +38,11 @@ async def get_beatmap(
db: AsyncSession = Depends(get_db),
fetcher: Fetcher = Depends(get_fetcher),
):
beatmap = (
await db.exec(
select(Beatmap)
.options(
joinedload(Beatmap.beatmapset).selectinload( # pyright: ignore[reportArgumentType]
Beatmapset.beatmaps # pyright: ignore[reportArgumentType]
)
)
.where(Beatmap.id == bid)
)
).first()
if not beatmap:
try:
resp = await fetcher.get_beatmap(bid)
r = await db.exec(
select(Beatmapset.id).where(Beatmapset.id == resp.beatmapset_id)
)
if not r.first():
set_resp = await fetcher.get_beatmapset(resp.beatmapset_id)
await Beatmapset.from_resp(db, set_resp, from_=resp.id)
await Beatmap.from_resp(db, resp)
except HTTPStatusError:
raise HTTPException(status_code=404, detail="Beatmap not found")
else:
resp = BeatmapResp.from_db(beatmap)
return resp
try:
beatmap = await Beatmap.get_or_fetch(db, bid, fetcher)
return BeatmapResp.from_db(beatmap)
except HTTPError:
raise HTTPException(status_code=404, detail="Beatmap not found")
class BatchGetResp(BaseModel):
@@ -75,7 +63,7 @@ async def batch_get_beatmaps(
select(Beatmap)
.options(
joinedload(
Beatmap.beatmapset # pyright: ignore[reportArgumentType]
Beatmap.beatmapset # pyright: ignore[reportArgumentType]
).selectinload(
Beatmapset.beatmaps # pyright: ignore[reportArgumentType]
)
@@ -90,8 +78,8 @@ async def batch_get_beatmaps(
select(Beatmap)
.options(
joinedload(
Beatmap.beatmapset # pyright: ignore[reportArgumentType]
).selectinload(
Beatmap.beatmapset # pyright: ignore[reportArgumentType]
).selectinload(
Beatmapset.beatmaps # pyright: ignore[reportArgumentType]
)
)
@@ -103,137 +91,52 @@ async def batch_get_beatmaps(
return BatchGetResp(beatmaps=[BeatmapResp.from_db(bm) for bm in beatmaps])
class BeatmapScores(BaseModel):
scores: list[ScoreResp]
userScore: ScoreResp | None = None
@router.get(
"/beatmaps/{beatmap}/scores", tags=["beatmap"], response_model=BeatmapScores
)
async def get_beatmap_scores(
beatmap: int,
legacy_only: bool = Query(None), # TODO:加入对这个参数的查询
mode: str = Query(None),
# mods: List[APIMod] = Query(None), # TODO:加入指定MOD的查询
type: str = Query(None),
current_user: DBUser = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
if legacy_only:
raise HTTPException(
status_code=404, detail="this server only contains lazer scores"
)
all_scores = (
await db.exec(
select(Score).where(Score.beatmap_id == beatmap)
# .where(Score.mods == mods if mods else True)
)
).all()
user_score = (
await db.exec(
select(Score)
.options(
joinedload(Score.beatmap) # pyright: ignore[reportArgumentType]
.joinedload(Beatmap.beatmapset) # pyright: ignore[reportArgumentType]
.selectinload(
Beatmapset.beatmaps # pyright: ignore[reportArgumentType]
)
)
.where(Score.beatmap_id == beatmap)
.where(Score.user_id == current_user.id)
)
).first()
return BeatmapScores(
scores=[ScoreResp.from_db(score) for score in all_scores],
userScore=ScoreResp.from_db(user_score) if user_score else None,
)
class BeatmapUserScore(BaseModel):
position: int
score: ScoreResp
@router.get(
"/beatmaps/{beatmap}/scores/users/{user}",
@router.post(
"/beatmaps/{beatmap}/attributes",
tags=["beatmap"],
response_model=BeatmapUserScore,
response_model=BeatmapAttributes,
)
async def get_user_beatmap_score(
async def get_beatmap_attributes(
beatmap: int,
user: int,
legacy_only: bool = Query(None),
mode: str = Query(None),
mods: str = Query(None), # TODO:添加mods筛选
current_user: DBUser = Depends(get_current_user),
mods: list[str] = Query(default_factory=list),
ruleset: GameMode | None = Query(default=None),
ruleset_id: int | None = Query(default=None),
redis: Redis = Depends(get_redis),
db: AsyncSession = Depends(get_db),
fetcher: Fetcher = Depends(get_fetcher),
):
if legacy_only:
raise HTTPException(
status_code=404, detail="This server only contains non-legacy scores"
)
user_score = (
await db.exec(
select(Score)
.options(
joinedload(Score.beatmap) # pyright: ignore[reportArgumentType]
.joinedload(Beatmap.beatmapset) # pyright: ignore[reportArgumentType]
.selectinload(
Beatmapset.beatmaps # pyright: ignore[reportArgumentType]
)
)
.where(Score.gamemode==mode if mode is not None else True)
.where(Score.beatmap_id == beatmap)
.where(Score.user_id == user)
.order_by(col(Score.classic_total_score).desc())
)
).first()
if not user_score:
raise HTTPException(
status_code=404, detail="Cannot find user %s's score on this beatmap" % user
)
mods_ = []
if mods and mods[0].isdigit():
mods_ = int_to_mods(int(mods[0]))
else:
return BeatmapUserScore(
position=user_score.position if user_score.position is not None else 0,
score=ScoreResp.from_db(user_score),
)
for i in mods:
try:
mods_.append(json.loads(i))
except json.JSONDecodeError:
mods_.append(APIMod(acronym=i, settings={}))
mods_.sort(key=lambda x: x["acronym"])
if ruleset_id is not None and ruleset is None:
ruleset = INT_TO_MODE[ruleset_id]
if ruleset is None:
beatmap_db = await Beatmap.get_or_fetch(db, beatmap, fetcher)
ruleset = beatmap_db.mode
key = (
f"beatmap:{beatmap}:{ruleset}:"
f"{hashlib.md5(str(mods_).encode()).hexdigest()}:attributes"
)
if redis.exists(key):
return BeatmapAttributes.model_validate_json(redis.get(key)) # pyright: ignore[reportArgumentType]
@router.get(
"/beatmaps/{beatmap}/scores/users/{user}/all",
tags=["beatmap"],
response_model=list[ScoreResp],
)
async def get_user_all_beatmap_scores(
beatmap: int,
user: int,
legacy_only: bool = Query(None),
ruleset: str = Query(None),
current_user: DBUser = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
if legacy_only:
raise HTTPException(status_code=404,detail="This server only contains non-legacy scores")
all_user_scores=(
await db.exec(
select(Score)
.options(
joinedload(Score.beatmap) # pyright: ignore[reportArgumentType]
.joinedload(Beatmap.beatmapset) # pyright: ignore[reportArgumentType]
.selectinload(
Beatmapset.beatmaps # pyright: ignore[reportArgumentType]
)
try:
resp = await fetcher.get_beatmap_raw(beatmap)
try:
attr = await asyncio.get_event_loop().run_in_executor(
None, calculate_beatmap_attribute, resp, ruleset, mods_
)
.where(Score.gamemode==ruleset if ruleset is not None else True)
.where(Score.beatmap_id == beatmap)
.where(Score.user_id == user)
.order_by(col(Score.classic_total_score).desc())
)
).all()
return [ScoreResp.from_db(score) for score in all_user_scores]
except rosu.ConvertError as e: # pyright: ignore[reportAttributeAccessIssue]
raise HTTPException(status_code=400, detail=str(e))
redis.set(key, attr.model_dump_json())
return attr
except HTTPStatusError:
raise HTTPException(status_code=404, detail="Beatmap not found")

156
app/router/score.py Normal file
View File

@@ -0,0 +1,156 @@
from __future__ import annotations
from app.database import (
Beatmap,
User as DBUser,
)
from app.database.beatmapset import Beatmapset
from app.database.score import Score, ScoreResp
from app.dependencies.database import get_db
from app.dependencies.user import get_current_user
from .api_router import router
from fastapi import Depends, HTTPException, Query
from pydantic import BaseModel
from sqlalchemy.orm import joinedload
from sqlmodel import col, select
from sqlmodel.ext.asyncio.session import AsyncSession
class BeatmapScores(BaseModel):
scores: list[ScoreResp]
userScore: ScoreResp | None = None
@router.get(
"/beatmaps/{beatmap}/scores", tags=["beatmap"], response_model=BeatmapScores
)
async def get_beatmap_scores(
beatmap: int,
legacy_only: bool = Query(None), # TODO:加入对这个参数的查询
mode: str = Query(None),
# mods: List[APIMod] = Query(None), # TODO:加入指定MOD的查询
type: str = Query(None),
current_user: DBUser = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
if legacy_only:
raise HTTPException(
status_code=404, detail="this server only contains lazer scores"
)
all_scores = (
await db.exec(
select(Score).where(Score.beatmap_id == beatmap)
# .where(Score.mods == mods if mods else True)
)
).all()
user_score = (
await db.exec(
select(Score)
.options(
joinedload(Score.beatmap) # pyright: ignore[reportArgumentType]
.joinedload(Beatmap.beatmapset) # pyright: ignore[reportArgumentType]
.selectinload(
Beatmapset.beatmaps # pyright: ignore[reportArgumentType]
)
)
.where(Score.beatmap_id == beatmap)
.where(Score.user_id == current_user.id)
)
).first()
return BeatmapScores(
scores=[ScoreResp.from_db(score) for score in all_scores],
userScore=ScoreResp.from_db(user_score) if user_score else None,
)
class BeatmapUserScore(BaseModel):
position: int
score: ScoreResp
@router.get(
"/beatmaps/{beatmap}/scores/users/{user}",
tags=["beatmap"],
response_model=BeatmapUserScore,
)
async def get_user_beatmap_score(
beatmap: int,
user: int,
legacy_only: bool = Query(None),
mode: str = Query(None),
mods: str = Query(None), # TODO:添加mods筛选
current_user: DBUser = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
if legacy_only:
raise HTTPException(
status_code=404, detail="This server only contains non-legacy scores"
)
user_score = (
await db.exec(
select(Score)
.options(
joinedload(Score.beatmap) # pyright: ignore[reportArgumentType]
.joinedload(Beatmap.beatmapset) # pyright: ignore[reportArgumentType]
.selectinload(
Beatmapset.beatmaps # pyright: ignore[reportArgumentType]
)
)
.where(Score.gamemode == mode if mode is not None else True)
.where(Score.beatmap_id == beatmap)
.where(Score.user_id == user)
.order_by(col(Score.classic_total_score).desc())
)
).first()
if not user_score:
raise HTTPException(
status_code=404, detail=f"Cannot find user {user}'s score on this beatmap"
)
else:
return BeatmapUserScore(
position=user_score.position if user_score.position is not None else 0,
score=ScoreResp.from_db(user_score),
)
@router.get(
"/beatmaps/{beatmap}/scores/users/{user}/all",
tags=["beatmap"],
response_model=list[ScoreResp],
)
async def get_user_all_beatmap_scores(
beatmap: int,
user: int,
legacy_only: bool = Query(None),
ruleset: str = Query(None),
current_user: DBUser = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
if legacy_only:
raise HTTPException(
status_code=404, detail="This server only contains non-legacy scores"
)
all_user_scores = (
await db.exec(
select(Score)
.options(
joinedload(Score.beatmap) # pyright: ignore[reportArgumentType]
.joinedload(Beatmap.beatmapset) # pyright: ignore[reportArgumentType]
.selectinload(
Beatmapset.beatmaps # pyright: ignore[reportArgumentType]
)
)
.where(Score.gamemode == ruleset if ruleset is not None else True)
.where(Score.beatmap_id == beatmap)
.where(Score.user_id == user)
.order_by(col(Score.classic_total_score).desc())
)
).all()
return [ScoreResp.from_db(score) for score in all_user_scores]