feat(api): 支持 x-api-version (#29)

* feat(relationship): support legacy-compatible response format

* feat(score): add support for legacy score response format in API

* fix(score): avoid missing greenlet

* fix(score): fix missing field for model validation

* feat(user): apply legacy score format for user

* feat(api): use `int` to hint `APIVersion`
This commit is contained in:
MingxuanGame
2025-09-14 14:09:53 +08:00
committed by GitHub
parent e591280620
commit 19f94fffbb
6 changed files with 218 additions and 35 deletions

View File

@@ -47,7 +47,7 @@ from .relationship import (
)
from .score_token import ScoreToken
from pydantic import field_serializer, field_validator
from pydantic import BaseModel, field_serializer, field_validator
from redis.asyncio import Redis
from sqlalchemy import Boolean, Column, DateTime, TextClause
from sqlalchemy.ext.asyncio import AsyncAttrs
@@ -194,6 +194,11 @@ class Score(ScoreBase, table=True):
def is_perfect_combo(self) -> bool:
return self.max_combo == self.beatmap.max_combo
async def to_resp(self, session: AsyncSession, api_version: int) -> "ScoreResp | LegacyScoreResp":
if api_version >= 20220705:
return await ScoreResp.from_db(session, self)
return await LegacyScoreResp.from_db(session, self)
class ScoreResp(ScoreBase):
id: int
@@ -334,6 +339,90 @@ class ScoreResp(ScoreBase):
return s
class LegacyStatistics(BaseModel):
count_300: int
count_100: int
count_50: int
count_miss: int
count_geki: int | None = None
count_katu: int | None = None
class LegacyScoreResp(UTCBaseModel):
accuracy: float
best_id: int
created_at: datetime
id: int
max_combo: int
mode: GameMode
mode_int: int
mods: list[str] # acronym
passed: bool
perfect: bool = False
pp: float
rank: Rank
replay: bool
score: int
statistics: LegacyStatistics
type: str
user_id: int
current_user_attributes: CurrentUserAttributes
user: UserResp
beatmap: BeatmapResp
rank_global: int | None = Field(default=None, exclude=True)
@classmethod
async def from_db(cls, session: AsyncSession, score: Score) -> "LegacyScoreResp":
await session.refresh(score)
await score.awaitable_attrs.beatmap
return cls(
accuracy=score.accuracy,
best_id=await get_best_id(session, score.id) or 0,
created_at=score.started_at,
id=score.id,
max_combo=score.max_combo,
mode=score.gamemode,
mode_int=int(score.gamemode),
mods=[m["acronym"] for m in score.mods],
passed=score.passed,
pp=score.pp,
rank=score.rank,
replay=score.has_replay,
score=score.total_score,
statistics=LegacyStatistics(
count_300=score.n300,
count_100=score.n100,
count_50=score.n50,
count_miss=score.nmiss,
count_geki=score.ngeki or 0,
count_katu=score.nkatu or 0,
),
type=score.type,
user_id=score.user_id,
current_user_attributes=CurrentUserAttributes(
pin=PinAttributes(is_pinned=bool(score.pinned_order), score_id=score.id)
),
user=await UserResp.from_db(
score.user,
session,
include=["statistics", "team", "daily_challenge_user_stats"],
ruleset=score.gamemode,
),
beatmap=await BeatmapResp.from_db(score.beatmap),
perfect=score.is_perfect_combo,
rank_global=(
await get_score_position_by_id(
session,
score.beatmap_id,
score.id,
mode=score.gamemode,
user=score.user,
)
or None
),
)
class MultiplayerScores(RespWithCursor):
scores: list[ScoreResp] = Field(default_factory=list)
params: dict[str, Any] = Field(default_factory=dict)

View File

@@ -0,0 +1,16 @@
from __future__ import annotations
from typing import Annotated
from fastapi import Depends, Header
def get_api_version(version: int | None = Header(None, alias="x-api-version")) -> int:
if version is None:
return 0
if version < 1:
raise ValueError
return version
APIVersion = Annotated[int, Depends(get_api_version)]

View File

@@ -1,6 +1,8 @@
from __future__ import annotations
from app.database import Relationship, RelationshipResp, RelationshipType, User
from app.database.lazer_user import UserResp
from app.dependencies.api_version import APIVersion
from app.dependencies.database import Database
from app.dependencies.user import get_client_user, get_current_user
@@ -14,9 +16,34 @@ from sqlmodel import exists, select
@router.get(
"/friends",
tags=["用户关系"],
response_model=list[RelationshipResp],
responses={
200: {
"description": "好友列表",
"content": {
"application/json": {
"schema": {
"oneOf": [
{
"type": "array",
"items": {"$ref": "#/components/schemas/RelationshipResp"},
"description": "好友列表",
},
{
"type": "array",
"items": {"$ref": "#/components/schemas/UserResp"},
"description": "好友列表 (`x-api-version < 20241022`)",
},
]
}
}
},
}
},
name="获取好友列表",
description="获取当前用户的好友列表。",
description=(
"获取当前用户的好友列表。\n\n"
"如果 `x-api-version < 20241022`,返回值为 `UserResp` 列表,否则为 `RelationshipResp` 列表。"
),
)
@router.get(
"/blocks",
@@ -28,6 +55,7 @@ from sqlmodel import exists, select
async def get_relationship(
db: Database,
request: Request,
api_version: APIVersion,
current_user: User = Security(get_current_user, scopes=["friends.read"]),
):
relationship_type = RelationshipType.FOLLOW if request.url.path.endswith("/friends") else RelationshipType.BLOCK
@@ -37,7 +65,22 @@ async def get_relationship(
Relationship.type == relationship_type,
)
)
return [await RelationshipResp.from_db(db, rel) for rel in relationships.unique()]
if api_version >= 20241022 or relationship_type == RelationshipType.BLOCK:
return [await RelationshipResp.from_db(db, rel) for rel in relationships.unique()]
else:
return [
await UserResp.from_db(
rel.target,
db,
include=[
"team",
"daily_challenge_user_stats",
"statistics",
"statistics_rulesets",
],
)
for rel in relationships.unique()
]
class AddFriendResp(BaseModel):

View File

@@ -29,12 +29,14 @@ from app.database.playlist_best_score import (
)
from app.database.relationship import Relationship, RelationshipType
from app.database.score import (
LegacyScoreResp,
MultiplayerScores,
ScoreAround,
get_leaderboard,
process_score,
process_user,
)
from app.dependencies.api_version import APIVersion
from app.dependencies.database import Database, get_redis, with_db
from app.dependencies.fetcher import get_fetcher
from app.dependencies.storage import get_storage_service
@@ -264,21 +266,31 @@ async def _preload_beatmap_for_pp_calculation(beatmap_id: int) -> None:
logger.warning(f"Failed to preload beatmap {beatmap_id}: {e}")
class BeatmapScores(BaseModel):
scores: list[ScoreResp]
user_score: BeatmapUserScore | None = None
class BeatmapUserScore[T: ScoreResp | LegacyScoreResp](BaseModel):
position: int
score: T
class BeatmapScores[T: ScoreResp | LegacyScoreResp](BaseModel):
scores: list[T]
user_score: BeatmapUserScore[T] | None = None
score_count: int = 0
@router.get(
"/beatmaps/{beatmap_id}/scores",
tags=["成绩"],
response_model=BeatmapScores,
response_model=BeatmapScores[ScoreResp] | BeatmapScores[LegacyScoreResp],
name="获取谱面排行榜",
description="获取指定谱面在特定条件下的排行榜及当前用户成绩。",
description=(
"获取指定谱面在特定条件下的排行榜及当前用户成绩。\n\n"
"如果 `x-api-version >= 20220705`,返回值为 `BeatmapScores[ScoreResp]`"
"否则为 `BeatmapScores[LegacyScoreResp]`。"
),
)
async def get_beatmap_scores(
db: Database,
api_version: APIVersion,
beatmap_id: int = Path(description="谱面 ID"),
mode: GameMode = Query(description="指定 auleset"),
legacy_only: bool = Query(None, description="是否只查询 Stable 分数"),
@@ -303,9 +315,9 @@ async def get_beatmap_scores(
mods=sorted(mods),
)
user_score_resp = await ScoreResp.from_db(db, user_score) if user_score else None
user_score_resp = await user_score.to_resp(db, api_version) if user_score else None
resp = BeatmapScores(
scores=[await ScoreResp.from_db(db, score) for score in all_scores],
scores=[await score.to_resp(db, api_version) for score in all_scores],
user_score=BeatmapUserScore(score=user_score_resp, position=user_score_resp.rank_global or 0)
if user_score_resp
else None,
@@ -314,20 +326,20 @@ async def get_beatmap_scores(
return resp
class BeatmapUserScore(BaseModel):
position: int
score: ScoreResp
@router.get(
"/beatmaps/{beatmap_id}/scores/users/{user_id}",
tags=["成绩"],
response_model=BeatmapUserScore,
response_model=BeatmapUserScore[ScoreResp] | BeatmapUserScore[LegacyScoreResp],
name="获取用户谱面最高成绩",
description="获取指定用户在指定谱面上的最高成绩。",
description=(
"获取指定用户在指定谱面上的最高成绩。\n\n"
"如果 `x-api-version >= 20220705`,返回值为 `BeatmapUserScore[ScoreResp]`"
"否则为 `BeatmapUserScore[LegacyScoreResp]`。"
),
)
async def get_user_beatmap_score(
db: Database,
api_version: APIVersion,
beatmap_id: int = Path(description="谱面 ID"),
user_id: int = Path(description="用户 ID"),
legacy_only: bool = Query(None, description="是否只查询 Stable 分数"),
@@ -355,7 +367,7 @@ async def get_user_beatmap_score(
detail=f"Cannot find user {user_id}'s score on this beatmap",
)
else:
resp = await ScoreResp.from_db(db, user_score)
resp = await user_score.to_resp(db, api_version=api_version)
return BeatmapUserScore(
position=resp.rank_global or 0,
score=resp,
@@ -365,12 +377,17 @@ async def get_user_beatmap_score(
@router.get(
"/beatmaps/{beatmap_id}/scores/users/{user_id}/all",
tags=["成绩"],
response_model=list[ScoreResp],
response_model=list[ScoreResp] | list[LegacyScoreResp],
name="获取用户谱面全部成绩",
description="获取指定用户在指定谱面上的全部成绩列表。",
description=(
"获取指定用户在指定谱面上的全部成绩列表。\n\n"
"如果 `x-api-version >= 20220705`,返回值为 `ScoreResp`列表,"
"否则为 `LegacyScoreResp`列表。"
),
)
async def get_user_all_beatmap_scores(
db: Database,
api_version: APIVersion,
beatmap_id: int = Path(description="谱面 ID"),
user_id: int = Path(description="用户 ID"),
legacy_only: bool = Query(None, description="是否只查询 Stable 分数"),
@@ -390,7 +407,7 @@ async def get_user_all_beatmap_scores(
)
).all()
return [await ScoreResp.from_db(db, score) for score in all_user_scores]
return [await score.to_resp(db, api_version) for score in all_user_scores]
@router.post(

View File

@@ -15,7 +15,8 @@ from app.database import (
from app.database.events import Event
from app.database.lazer_user import SEARCH_INCLUDED
from app.database.pp_best_score import PPBestScore
from app.database.score import Score, ScoreResp, get_user_first_scores
from app.database.score import LegacyScoreResp, Score, ScoreResp, get_user_first_scores
from app.dependencies.api_version import APIVersion
from app.dependencies.database import Database, get_redis
from app.dependencies.user import get_current_user
from app.log import logger
@@ -313,13 +314,18 @@ async def get_user_beatmapsets(
@router.get(
"/users/{user_id}/scores/{type}",
response_model=list[ScoreResp],
response_model=list[ScoreResp] | list[LegacyScoreResp],
name="获取用户成绩列表",
description="获取用户特定类型的成绩列表,如最好成绩、最近成绩等。",
description=(
"获取用户特定类型的成绩列表,如最好成绩、最近成绩等。\n\n"
"如果 `x-api-version >= 20220705`,返回值为 `ScoreResp`列表,"
"否则为 `LegacyScoreResp`列表。"
),
tags=["用户"],
)
async def get_user_scores(
session: Database,
api_version: APIVersion,
background_task: BackgroundTasks,
user_id: int = Path(description="用户 ID"),
type: Literal["best", "recent", "firsts", "pinned"] = Path(
@@ -332,12 +338,15 @@ async def get_user_scores(
offset: int = Query(0, ge=0, description="偏移量"),
current_user: User = Security(get_current_user, scopes=["public"]),
):
is_legacy_api = api_version < 20220705
redis = get_redis()
cache_service = get_user_cache_service(redis)
# 先尝试从缓存获取对于recent类型使用较短的缓存时间
cache_expire = 30 if type == "recent" else settings.user_scores_cache_expire_seconds
cached_scores = await cache_service.get_user_scores_from_cache(user_id, type, include_fails, mode, limit, offset)
cached_scores = await cache_service.get_user_scores_from_cache(
user_id, type, include_fails, mode, limit, offset, is_legacy_api
)
if cached_scores is not None:
return cached_scores
@@ -373,9 +382,9 @@ async def get_user_scores(
scores = [best_score.score for best_score in best_scores]
score_responses = [
await ScoreResp.from_db(
await score.to_resp(
session,
score,
api_version,
)
for score in scores
]
@@ -385,12 +394,13 @@ async def get_user_scores(
cache_service.cache_user_scores,
user_id,
type,
score_responses,
score_responses, # pyright: ignore[reportArgumentType]
include_fails,
mode,
limit,
offset,
cache_expire,
is_legacy_api,
)
return score_responses

View File

@@ -13,7 +13,7 @@ from app.config import settings
from app.const import BANCHOBOT_ID
from app.database import User, UserResp
from app.database.lazer_user import SEARCH_INCLUDED
from app.database.score import ScoreResp
from app.database.score import LegacyScoreResp, ScoreResp
from app.log import logger
from app.models.score import GameMode
from app.service.asset_proxy_service import get_asset_proxy_service
@@ -111,11 +111,13 @@ class UserCacheService:
mode: GameMode | None = None,
limit: int = 100,
offset: int = 0,
is_legacy: bool = False,
) -> str:
"""生成用户成绩缓存键"""
mode_part = f":{mode}" if mode else ""
return (
f"user:{user_id}:scores:{score_type}{mode_part}:limit:{limit}:offset:{offset}:include_fail:{include_fail}"
f"user:{user_id}:scores:{score_type}{mode_part}:limit:{limit}:offset:"
f"{offset}:include_fail:{include_fail}:is_legacy:{is_legacy}"
)
def _get_user_beatmapsets_cache_key(
@@ -166,10 +168,13 @@ class UserCacheService:
mode: GameMode | None = None,
limit: int = 100,
offset: int = 0,
) -> list[ScoreResp] | None:
is_legacy: bool = False,
) -> list[ScoreResp] | list[LegacyScoreResp] | None:
"""从缓存获取用户成绩"""
try:
cache_key = self._get_user_scores_cache_key(user_id, score_type, include_fail, mode, limit, offset)
cache_key = self._get_user_scores_cache_key(
user_id, score_type, include_fail, mode, limit, offset, is_legacy
)
cached_data = await self.redis.get(cache_key)
if cached_data:
logger.debug(f"User scores cache hit for user {user_id}, type {score_type}")
@@ -184,18 +189,21 @@ class UserCacheService:
self,
user_id: int,
score_type: str,
scores: list[ScoreResp],
scores: list[ScoreResp] | list[LegacyScoreResp],
include_fail: bool,
mode: GameMode | None = None,
limit: int = 100,
offset: int = 0,
expire_seconds: int | None = None,
is_legacy: bool = False,
):
"""缓存用户成绩"""
try:
if expire_seconds is None:
expire_seconds = settings.user_scores_cache_expire_seconds
cache_key = self._get_user_scores_cache_key(user_id, score_type, include_fail, mode, limit, offset)
cache_key = self._get_user_scores_cache_key(
user_id, score_type, include_fail, mode, limit, offset, is_legacy
)
# 使用 model_dump_json() 而不是 model_dump() + json.dumps()
scores_json_list = [score.model_dump_json() for score in scores]
cached_data = f"[{','.join(scores_json_list)}]"