docs(api): add api docs

This commit is contained in:
MingxuanGame
2025-08-12 08:40:27 +00:00
parent 50c25ab0c7
commit 2fa6d6dd7e
13 changed files with 570 additions and 214 deletions

View File

@@ -105,15 +105,17 @@ def validate_password(password: str) -> list[str]:
router = APIRouter(tags=["osu! OAuth 认证"])
@router.post("/users")
@router.post(
"/users",
name="注册用户",
description="用户注册接口",
)
async def register_user(
user_username: str = Form(..., alias="user[username]"),
user_email: str = Form(..., alias="user[user_email]"),
user_password: str = Form(..., alias="user[password]"),
user_username: str = Form(..., alias="user[username]", description="用户名"),
user_email: str = Form(..., alias="user[user_email]", description="电子邮箱"),
user_password: str = Form(..., alias="user[password]", description="密码"),
db: AsyncSession = Depends(get_db),
):
"""用户注册接口 - 匹配 osu! 客户端的注册请求"""
username_errors = validate_username(user_username)
email_errors = validate_email(user_email)
password_errors = validate_password(user_password)
@@ -197,22 +199,28 @@ async def register_user(
)
@router.post("/oauth/token", response_model=TokenResponse)
@router.post(
"/oauth/token",
response_model=TokenResponse,
name="获取访问令牌",
description="OAuth 令牌端点,支持密码、刷新令牌和授权码三种授权方式。",
)
async def oauth_token(
grant_type: Literal[
"authorization_code", "refresh_token", "password", "client_credentials"
] = Form(...),
client_id: int = Form(...),
client_secret: str = Form(...),
code: str | None = Form(None),
scope: str = Form("*"),
username: str | None = Form(None),
password: str | None = Form(None),
refresh_token: str | None = Form(None),
] = Form(..., description="授权类型:密码/刷新令牌/授权码/客户端凭证"),
client_id: int = Form(..., description="客户端 ID"),
client_secret: str = Form(..., description="客户端密钥"),
code: str | None = Form(None, description="授权码(仅授权码模式需要)"),
scope: str = Form("*", description="权限范围(空格分隔,默认为 '*'"),
username: str | None = Form(None, description="用户名(仅密码模式需要)"),
password: str | None = Form(None, description="密码(仅密码模式需要)"),
refresh_token: str | None = Form(
None, description="刷新令牌(仅刷新令牌模式需要)"
),
db: AsyncSession = Depends(get_db),
redis: Redis = Depends(get_redis),
):
"""OAuth 令牌端点"""
scopes = scope.split(" ")
client = (

View File

@@ -5,7 +5,7 @@ from app.fetcher import Fetcher
from fastapi import APIRouter, Depends
fetcher_router = APIRouter(prefix="/fetcher", tags=["fetcher"])
fetcher_router = APIRouter(prefix="/fetcher", include_in_schema=False)
@fetcher_router.get("/callback")

View File

@@ -6,7 +6,7 @@ from app.storage import LocalStorageService, StorageService
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import FileResponse
file_router = APIRouter(prefix="/file")
file_router = APIRouter(prefix="/file", include_in_schema=False)
@file_router.get("/{path:path}")

View File

@@ -19,7 +19,7 @@ from app.models.score import (
from .router import router
from fastapi import Depends, HTTPException, Query, Security
from fastapi import Depends, HTTPException, Path, Query, Security
from httpx import HTTPError, HTTPStatusError
from pydantic import BaseModel
from redis.asyncio import Redis
@@ -28,11 +28,31 @@ from sqlmodel import col, select
from sqlmodel.ext.asyncio.session import AsyncSession
@router.get("/beatmaps/lookup", tags=["beatmap"], response_model=BeatmapResp)
class BatchGetResp(BaseModel):
"""批量获取谱面返回模型。
返回字段说明:
- beatmaps: 谱面详细信息列表。"""
beatmaps: list[BeatmapResp]
@router.get(
"/beatmaps/lookup",
tags=["谱面"],
name="查询单个谱面",
response_model=BeatmapResp,
description=(
"根据谱面 ID / MD5 / 文件名 查询单个谱面。"
"至少提供 id / checksum / filename 之一。"
),
)
async def lookup_beatmap(
id: int | None = Query(default=None, alias="id"),
md5: str | None = Query(default=None, alias="checksum"),
filename: str | None = Query(default=None, alias="filename"),
id: int | None = Query(default=None, alias="id", description="谱面 ID"),
md5: str | None = Query(default=None, alias="checksum", description="谱面文件 MD5"),
filename: str | None = Query(
default=None, alias="filename", description="谱面文件名"
),
current_user: User = Security(get_current_user, scopes=["public"]),
db: AsyncSession = Depends(get_db),
fetcher: Fetcher = Depends(get_fetcher),
@@ -53,34 +73,45 @@ async def lookup_beatmap(
return await BeatmapResp.from_db(beatmap, session=db, user=current_user)
@router.get("/beatmaps/{bid}", tags=["beatmap"], response_model=BeatmapResp)
@router.get(
"/beatmaps/{beatmap_id}",
tags=["谱面"],
name="获取谱面详情",
response_model=BeatmapResp,
description="获取单个谱面详情。",
)
async def get_beatmap(
bid: int,
beatmap_id: int = Path(..., description="谱面 ID"),
current_user: User = Security(get_current_user, scopes=["public"]),
db: AsyncSession = Depends(get_db),
fetcher: Fetcher = Depends(get_fetcher),
):
try:
beatmap = await Beatmap.get_or_fetch(db, fetcher, bid)
beatmap = await Beatmap.get_or_fetch(db, fetcher, beatmap_id)
return await BeatmapResp.from_db(beatmap, session=db, user=current_user)
except HTTPError:
raise HTTPException(status_code=404, detail="Beatmap not found")
class BatchGetResp(BaseModel):
beatmaps: list[BeatmapResp]
@router.get("/beatmaps", tags=["beatmap"], response_model=BatchGetResp)
@router.get("/beatmaps/", tags=["beatmap"], response_model=BatchGetResp)
@router.get(
"/beatmaps/",
tags=["谱面"],
name="批量获取谱面",
response_model=BatchGetResp,
description=(
"批量获取谱面。若不提供 ids[],按最近更新时间返回最多 50 条。"
"为空时按最近更新时间返回。"
),
)
async def batch_get_beatmaps(
b_ids: list[int] = Query(alias="ids[]", default_factory=list),
beatmap_ids: list[int] = Query(
alias="ids[]", default_factory=list, description="谱面 ID 列表 (最多 50 个)"
),
current_user: User = Security(get_current_user, scopes=["public"]),
db: AsyncSession = Depends(get_db),
fetcher: Fetcher = Depends(get_fetcher),
):
if not b_ids:
# select 50 beatmaps by last_updated
if not beatmap_ids:
beatmaps = (
await db.exec(
select(Beatmap).order_by(col(Beatmap.last_updated).desc()).limit(50)
@@ -90,12 +121,12 @@ async def batch_get_beatmaps(
beatmaps = list(
(
await db.exec(
select(Beatmap).where(col(Beatmap.id).in_(b_ids)).limit(50)
select(Beatmap).where(col(Beatmap.id).in_(beatmap_ids)).limit(50)
)
).all()
)
not_found_beatmaps = [
bid for bid in b_ids if bid not in [bm.id for bm in beatmaps]
bid for bid in beatmap_ids if bid not in [bm.id for bm in beatmaps]
]
beatmaps.extend(
beatmap
@@ -120,16 +151,25 @@ async def batch_get_beatmaps(
@router.post(
"/beatmaps/{beatmap}/attributes",
tags=["beatmap"],
"/beatmaps/{beatmap_id}/attributes",
tags=["谱面"],
name="计算谱面属性",
response_model=BeatmapAttributes,
description=("计算谱面指定 mods / ruleset 下谱面的难度属性 (难度/PP 相关属性)。"),
)
async def get_beatmap_attributes(
beatmap: int,
beatmap_id: int = Path(..., description="谱面 ID"),
current_user: User = Security(get_current_user, scopes=["public"]),
mods: list[str] = Query(default_factory=list),
ruleset: GameMode | None = Query(default=None),
ruleset_id: int | None = Query(default=None),
mods: list[str] = Query(
default_factory=list,
description="Mods 列表;可为整型位掩码(单元素)或 JSON/简称",
),
ruleset: GameMode | None = Query(
default=None, description="指定 ruleset为空则使用谱面自身模式"
),
ruleset_id: int | None = Query(
default=None, description="以数字指定 ruleset (与 ruleset 二选一)"
),
redis: Redis = Depends(get_redis),
db: AsyncSession = Depends(get_db),
fetcher: Fetcher = Depends(get_fetcher),
@@ -147,17 +187,17 @@ async def get_beatmap_attributes(
if ruleset_id is not None and ruleset is None:
ruleset = INT_TO_MODE[ruleset_id]
if ruleset is None:
beatmap_db = await Beatmap.get_or_fetch(db, fetcher, beatmap)
beatmap_db = await Beatmap.get_or_fetch(db, fetcher, beatmap_id)
ruleset = beatmap_db.mode
key = (
f"beatmap:{beatmap}:{ruleset}:"
f"beatmap:{beatmap_id}:{ruleset}:"
f"{hashlib.md5(str(mods_).encode()).hexdigest()}:attributes"
)
if await redis.exists(key):
return BeatmapAttributes.model_validate_json(await redis.get(key)) # pyright: ignore[reportArgumentType]
try:
resp = await fetcher.get_or_fetch_beatmap_raw(redis, beatmap)
resp = await fetcher.get_or_fetch_beatmap_raw(redis, beatmap_id)
try:
attr = await asyncio.get_event_loop().run_in_executor(
None, calculate_beatmap_attribute, resp, ruleset, mods_

View File

@@ -10,16 +10,22 @@ from app.fetcher import Fetcher
from .router import router
from fastapi import Depends, Form, HTTPException, Query, Security
from fastapi import Depends, Form, HTTPException, Path, Query, Security
from fastapi.responses import RedirectResponse
from httpx import HTTPError
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
@router.get("/beatmapsets/lookup", tags=["beatmapset"], response_model=BeatmapsetResp)
@router.get(
"/beatmapsets/lookup",
tags=["谱面集"],
name="查询谱面集 (通过谱面 ID)",
response_model=BeatmapsetResp,
description=("通过谱面 ID 查询所属谱面集。"),
)
async def lookup_beatmapset(
beatmap_id: int = Query(),
beatmap_id: int = Query(description="谱面 ID"),
current_user: User = Security(get_current_user, scopes=["public"]),
db: AsyncSession = Depends(get_db),
fetcher: Fetcher = Depends(get_fetcher),
@@ -31,15 +37,21 @@ async def lookup_beatmapset(
return resp
@router.get("/beatmapsets/{sid}", tags=["beatmapset"], response_model=BeatmapsetResp)
@router.get(
"/beatmapsets/{beatmapset_id}",
tags=["谱面集"],
name="获取谱面集详情",
response_model=BeatmapsetResp,
description="获取单个谱面集详情。",
)
async def get_beatmapset(
sid: int,
beatmapset_id: int = Path(..., description="谱面集 ID"),
current_user: User = Security(get_current_user, scopes=["public"]),
db: AsyncSession = Depends(get_db),
fetcher: Fetcher = Depends(get_fetcher),
):
try:
beatmapset = await Beatmapset.get_or_fetch(db, fetcher, sid)
beatmapset = await Beatmapset.get_or_fetch(db, fetcher, beatmapset_id)
return await BeatmapsetResp.from_db(
beatmapset, session=db, include=["recent_favourites"], user=current_user
)
@@ -47,35 +59,48 @@ async def get_beatmapset(
raise HTTPException(status_code=404, detail="Beatmapset not found")
@router.get("/beatmapsets/{beatmapset}/download", tags=["beatmapset"])
@router.get(
"/beatmapsets/{beatmapset_id}/download",
tags=["谱面集"],
name="下载谱面集",
description="**客户端专属**\n下载谱面集文件。若用户国家为 CN 则跳转国内镜像。",
)
async def download_beatmapset(
beatmapset: int,
no_video: bool = Query(True, alias="noVideo"),
beatmapset_id: int = Path(..., description="谱面集 ID"),
no_video: bool = Query(True, alias="noVideo", description="是否下载无视频版本"),
current_user: User = Security(get_current_user, scopes=["*"]),
):
if current_user.country_code == "CN":
return RedirectResponse(
f"https://txy1.sayobot.cn/beatmaps/download/"
f"{'novideo' if no_video else 'full'}/{beatmapset}?server=auto"
f"{'novideo' if no_video else 'full'}/{beatmapset_id}?server=auto"
)
else:
return RedirectResponse(
f"https://api.nerinyan.moe/d/{beatmapset}?noVideo={no_video}"
f"https://api.nerinyan.moe/d/{beatmapset_id}?noVideo={no_video}"
)
@router.post("/beatmapsets/{beatmapset}/favourites", tags=["beatmapset"])
@router.post(
"/beatmapsets/{beatmapset_id}/favourites",
tags=["谱面集"],
name="收藏或取消收藏谱面集",
description="**客户端专属**\n收藏或取消收藏指定谱面集。",
)
async def favourite_beatmapset(
beatmapset: int,
action: Literal["favourite", "unfavourite"] = Form(),
beatmapset_id: int = Path(..., description="谱面集 ID"),
action: Literal["favourite", "unfavourite"] = Form(
description="操作类型favourite 收藏 / unfavourite 取消收藏"
),
current_user: User = Security(get_current_user, scopes=["*"]),
db: AsyncSession = Depends(get_db),
):
assert current_user.id is not None
existing_favourite = (
await db.exec(
select(FavouriteBeatmapset).where(
FavouriteBeatmapset.user_id == current_user.id,
FavouriteBeatmapset.beatmapset_id == beatmapset,
FavouriteBeatmapset.beatmapset_id == beatmapset_id,
)
)
).first()
@@ -87,7 +112,7 @@ async def favourite_beatmapset(
if action == "favourite":
favourite = FavouriteBeatmapset(
user_id=current_user.id, beatmapset_id=beatmapset
user_id=current_user.id, beatmapset_id=beatmapset_id
)
db.add(favourite)
else:

View File

@@ -8,14 +8,19 @@ from app.models.score import GameMode
from .router import router
from fastapi import Depends, Security
from fastapi import Depends, Path, Security
from sqlmodel.ext.asyncio.session import AsyncSession
@router.get("/me/{ruleset}", response_model=UserResp)
@router.get("/me/", response_model=UserResp)
async def get_user_info_default(
ruleset: GameMode | None = None,
@router.get(
"/me/{ruleset}",
response_model=UserResp,
name="获取当前用户信息 (指定 ruleset)",
description="获取当前登录用户信息 (含指定 ruleset 统计)。",
tags=["用户"],
)
async def get_user_info_with_ruleset(
ruleset: GameMode = Path(description="指定 ruleset"),
current_user: User = Security(get_current_user, scopes=["identify"]),
session: AsyncSession = Depends(get_db),
):
@@ -25,3 +30,22 @@ async def get_user_info_default(
ALL_INCLUDED,
ruleset,
)
@router.get(
"/me/",
response_model=UserResp,
name="获取当前用户信息",
description="获取当前登录用户信息。",
tags=["用户"],
)
async def get_user_info_default(
current_user: User = Security(get_current_user, scopes=["identify"]),
session: AsyncSession = Depends(get_db),
):
return await UserResp.from_db(
current_user,
session,
ALL_INCLUDED,
None,
)

View File

@@ -10,15 +10,28 @@ from pydantic import BaseModel
class Background(BaseModel):
"""季节背景图单项。
- url: 图片链接地址。"""
url: str
class BackgroundsResp(BaseModel):
"""季节背景图返回模型。
- ends_at: 结束时间(若为远未来表示长期有效)。
- backgrounds: 背景图列表。"""
ends_at: datetime = datetime(year=9999, month=12, day=31, tzinfo=UTC)
backgrounds: list[Background]
@router.get("/seasonal-backgrounds", response_model=BackgroundsResp)
@router.get(
"/seasonal-backgrounds",
response_model=BackgroundsResp,
tags=["杂项"],
name="获取季节背景图列表",
description="获取当前季节背景图列表。",
)
async def get_seasonal_backgrounds():
return BackgroundsResp(
backgrounds=[Background(url=url) for url in settings.seasonal_backgrounds]

View File

@@ -6,14 +6,26 @@ from app.dependencies.user import get_current_user
from .router import router
from fastapi import Depends, HTTPException, Query, Request, Security
from fastapi import Depends, HTTPException, Path, Query, Request, Security
from pydantic import BaseModel
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
@router.get("/friends", tags=["relationship"], response_model=list[RelationshipResp])
@router.get("/blocks", tags=["relationship"], response_model=list[RelationshipResp])
@router.get(
"/friends",
tags=["用户关系"],
response_model=list[RelationshipResp],
name="获取好友列表",
description="获取当前用户的好友列表。",
)
@router.get(
"/blocks",
tags=["用户关系"],
response_model=list[RelationshipResp],
name="获取屏蔽列表",
description="获取当前用户的屏蔽用户列表。",
)
async def get_relationship(
request: Request,
current_user: User = Security(get_current_user, scopes=["friends.read"]),
@@ -34,17 +46,33 @@ async def get_relationship(
class AddFriendResp(BaseModel):
"""添加好友/屏蔽 返回模型。
- user_relation: 新的或更新后的关系对象。"""
user_relation: RelationshipResp
@router.post("/friends", tags=["relationship"], response_model=AddFriendResp)
@router.post("/blocks", tags=["relationship"])
@router.post(
"/friends",
tags=["用户关系"],
response_model=AddFriendResp,
name="添加或更新好友关系",
description="**客户端专属**\n添加或更新与目标用户的好友关系。",
)
@router.post(
"/blocks",
tags=["用户关系"],
name="添加或更新屏蔽关系",
description="**客户端专属**\n添加或更新与目标用户的屏蔽关系。",
)
async def add_relationship(
request: Request,
target: int = Query(),
target: int = Query(description="目标用户 ID"),
current_user: User = Security(get_current_user, scopes=["*"]),
db: AsyncSession = Depends(get_db),
):
assert current_user.id is not None
relationship_type = (
RelationshipType.FOLLOW
if request.url.path.endswith("/friends")
@@ -100,11 +128,21 @@ async def add_relationship(
)
@router.delete("/friends/{target}", tags=["relationship"])
@router.delete("/blocks/{target}", tags=["relationship"])
@router.delete(
"/friends/{target}",
tags=["用户关系"],
name="取消好友关系",
description="**客户端专属**\n删除与目标用户的好友关系。",
)
@router.delete(
"/blocks/{target}",
tags=["用户关系"],
name="取消屏蔽关系",
description="**客户端专属**\n删除与目标用户的屏蔽关系。",
)
async def delete_relationship(
request: Request,
target: int,
target: int = Path(..., description="目标用户 ID"),
current_user: User = Security(get_current_user, scopes=["*"]),
db: AsyncSession = Depends(get_db),
):

View File

@@ -20,7 +20,7 @@ from app.signalr.hub import MultiplayerHubs
from .router import router
from fastapi import Depends, HTTPException, Query, Security
from fastapi import Depends, HTTPException, Path, Query, Security
from pydantic import BaseModel, Field
from redis.asyncio import Redis
from sqlalchemy.sql.elements import ColumnElement
@@ -28,13 +28,29 @@ from sqlmodel import col, exists, select
from sqlmodel.ext.asyncio.session import AsyncSession
@router.get("/rooms", tags=["rooms"], response_model=list[RoomResp])
@router.get(
"/rooms",
tags=["房间"],
response_model=list[RoomResp],
name="获取房间列表",
description="获取房间列表。支持按状态/模式筛选",
)
async def get_all_rooms(
mode: Literal["open", "ended", "participated", "owned", None] = Query(
default="open"
default="open",
description=(
"房间模式open 当前开放 / ended 已经结束 / "
"participated 参与过 / owned 自己创建的房间"
),
),
category: RoomCategory = Query(RoomCategory.NORMAL),
status: RoomStatus | None = Query(None),
category: RoomCategory = Query(
RoomCategory.NORMAL,
description=(
"房间分类NORMAL 普通歌单模式房间 / REALTIME 多人游戏房间"
" / DAILY_CHALLENGE 每日挑战"
),
),
status: RoomStatus | None = Query(None, description="房间状态(可选)"),
db: AsyncSession = Depends(get_db),
current_user: User = Security(get_current_user, scopes=["public"]),
):
@@ -93,6 +109,9 @@ async def get_all_rooms(
class APICreatedRoom(RoomResp):
"""创建房间返回模型,继承 RoomResp。额外字段:
- error: 错误信息(为空表示成功)。"""
error: str = ""
@@ -120,32 +139,48 @@ async def _participate_room(
db_room.participant_count += 1
@router.post("/rooms", tags=["room"], response_model=APICreatedRoom)
@router.post(
"/rooms",
tags=["房间"],
response_model=APICreatedRoom,
name="创建房间",
description="**客户端专属**\n创建一个新的房间。",
)
async def create_room(
room: APIUploadedRoom,
db: AsyncSession = Depends(get_db),
current_user: User = Security(get_current_user, scopes=["*"]),
):
assert current_user.id is not None
user_id = current_user.id
db_room = await create_playlist_room_from_api(db, room, user_id)
await _participate_room(db_room.id, user_id, db_room, db)
# await db.commit()
# await db.refresh(db_room)
created_room = APICreatedRoom.model_validate(await RoomResp.from_db(db_room, db))
created_room.error = ""
return created_room
@router.get("/rooms/{room}", tags=["room"], response_model=RoomResp)
@router.get(
"/rooms/{room_id}",
tags=["房间"],
response_model=RoomResp,
name="获取房间详情",
description="获取单个房间详情。",
)
async def get_room(
room: int,
category: str = Query(default=""),
room_id: int = Path(..., description="房间 ID"),
category: str = Query(
default="",
description=(
"房间分类NORMAL 普通歌单模式房间 / REALTIME 多人游戏房间"
" / DAILY_CHALLENGE 每日挑战 (可选)"
),
),
db: AsyncSession = Depends(get_db),
current_user: User = Security(get_current_user, scopes=["*"]),
redis: Redis = Depends(get_redis),
):
# 直接从db获取信息毕竟都一样
db_room = (await db.exec(select(Room).where(Room.id == room))).first()
db_room = (await db.exec(select(Room).where(Room.id == room_id))).first()
if db_room is None:
raise HTTPException(404, "Room not found")
resp = await RoomResp.from_db(
@@ -154,13 +189,18 @@ async def get_room(
return resp
@router.delete("/rooms/{room}", tags=["room"])
@router.delete(
"/rooms/{room_id}",
tags=["房间"],
name="结束房间",
description="**客户端专属**\n结束歌单模式房间。",
)
async def delete_room(
room: int,
room_id: int = Path(..., description="房间 ID"),
db: AsyncSession = Depends(get_db),
current_user: User = Security(get_current_user, scopes=["*"]),
):
db_room = (await db.exec(select(Room).where(Room.id == room))).first()
db_room = (await db.exec(select(Room).where(Room.id == room_id))).first()
if db_room is None:
raise HTTPException(404, "Room not found")
else:
@@ -169,39 +209,48 @@ async def delete_room(
return None
@router.put("/rooms/{room}/users/{user}", tags=["room"])
@router.put(
"/rooms/{room_id}/users/{user_id}",
tags=["房间"],
name="加入房间",
description="**客户端专属**\n加入指定歌单模式房间。",
)
async def add_user_to_room(
room: int,
user: int,
room_id: int = Path(..., description="房间 ID"),
user_id: int = Path(..., description="用户 ID"),
db: AsyncSession = Depends(get_db),
current_user: User = Security(get_current_user, scopes=["*"]),
):
db_room = (await db.exec(select(Room).where(Room.id == room))).first()
db_room = (await db.exec(select(Room).where(Room.id == room_id))).first()
if db_room is not None:
await _participate_room(room, user, db_room, db)
await _participate_room(room_id, user_id, db_room, db)
await db.commit()
await db.refresh(db_room)
resp = await RoomResp.from_db(db_room, db)
return resp
else:
raise HTTPException(404, "room not found0")
@router.delete("/rooms/{room}/users/{user}", tags=["room"])
@router.delete(
"/rooms/{room_id}/users/{user_id}",
tags=["房间"],
name="离开房间",
description="**客户端专属**\n离开指定歌单模式房间。",
)
async def remove_user_from_room(
room: int,
user: int,
room_id: int = Path(..., description="房间 ID"),
user_id: int = Path(..., description="用户 ID"),
db: AsyncSession = Depends(get_db),
current_user: User = Security(get_current_user, scopes=["*"]),
):
db_room = (await db.exec(select(Room).where(Room.id == room))).first()
db_room = (await db.exec(select(Room).where(Room.id == room_id))).first()
if db_room is not None:
participated_user = (
await db.exec(
select(RoomParticipatedUser).where(
RoomParticipatedUser.room_id == room,
RoomParticipatedUser.user_id == user,
RoomParticipatedUser.room_id == room_id,
RoomParticipatedUser.user_id == user_id,
)
)
).first()
@@ -215,23 +264,32 @@ async def remove_user_from_room(
class APILeaderboard(BaseModel):
"""房间全局排行榜返回模型。
- leaderboard: 用户游玩统计(尝试次数/分数等)。
- user_score: 当前用户对应统计。"""
leaderboard: list[ItemAttemptsResp] = Field(default_factory=list)
user_score: ItemAttemptsResp | None = None
@router.get("/rooms/{room}/leaderboard", tags=["room"], response_model=APILeaderboard)
@router.get(
"/rooms/{room_id}/leaderboard",
tags=["房间"],
response_model=APILeaderboard,
name="获取房间排行榜",
description="获取房间内累计得分排行榜。",
)
async def get_room_leaderboard(
room: int,
room_id: int = Path(..., description="房间 ID"),
db: AsyncSession = Depends(get_db),
current_user: User = Security(get_current_user, scopes=["public"]),
):
db_room = (await db.exec(select(Room).where(Room.id == room))).first()
db_room = (await db.exec(select(Room).where(Room.id == room_id))).first()
if db_room is None:
raise HTTPException(404, "Room not found")
aggs = await db.exec(
select(ItemAttemptsCount)
.where(ItemAttemptsCount.room_id == room)
.where(ItemAttemptsCount.room_id == room_id)
.order_by(col(ItemAttemptsCount.total_score).desc())
)
aggs_resp = []
@@ -239,7 +297,6 @@ async def get_room_leaderboard(
for i, agg in enumerate(aggs):
resp = await ItemAttemptsResp.from_db(agg, db)
resp.position = i + 1
# resp.accuracy *= 100
aggs_resp.append(resp)
if agg.user_id == current_user.id:
user_agg = resp
@@ -250,6 +307,16 @@ async def get_room_leaderboard(
class RoomEvents(BaseModel):
"""房间事件流返回模型。
- beatmaps: 本次结果涉及的谱面列表。
- beatmapsets: 谱面集映射。
- current_playlist_item_id: 当前游玩列表(项目)项 ID。
- events: 事件列表。
- first_event_id / last_event_id: 事件范围。
- playlist_items: 房间游玩列表(项目)详情。
- room: 房间详情。
- user: 关联用户列表。"""
beatmaps: list[BeatmapResp] = Field(default_factory=list)
beatmapsets: dict[int, BeatmapsetResp] = Field(default_factory=dict)
current_playlist_item_id: int = 0
@@ -261,14 +328,20 @@ class RoomEvents(BaseModel):
user: list[UserResp] = Field(default_factory=list)
@router.get("/rooms/{room_id}/events", response_model=RoomEvents, tags=["room"])
@router.get(
"/rooms/{room_id}/events",
response_model=RoomEvents,
tags=["房间"],
name="获取房间事件",
description="获取房间事件列表 (倒序,可按 after / before 进行范围截取)。",
)
async def get_room_events(
room_id: int,
room_id: int = Path(..., description="房间 ID"),
db: AsyncSession = Depends(get_db),
current_user: User = Security(get_current_user, scopes=["public"]),
limit: int = Query(100, ge=1, le=1000),
after: int | None = Query(None, ge=0),
before: int | None = Query(None, ge=0),
limit: int = Query(100, ge=1, le=1000, description="返回条数 (1-1000)"),
after: int | None = Query(None, ge=0, description="仅包含大于该事件 ID 的事件"),
before: int | None = Query(None, ge=0, description="仅包含小于该事件 ID 的事件"),
):
events = (
await db.exec(
@@ -294,10 +367,8 @@ async def get_room_events(
current_playlist_item_id = 0
for event in events:
event_resps.append(MultiplayerEventResp.from_db(event))
if event.user_id:
user_ids.add(event.user_id)
if event.playlist_item_id is not None and (
playitem := (
await db.exec(
@@ -320,7 +391,6 @@ async def get_room_events(
for score in scores:
user_ids.add(score.user_id)
beatmap_ids.add(score.beatmap_id)
assert event.id is not None
first_event_id = min(first_event_id, event.id)
last_event_id = max(last_event_id, event.id)

View File

@@ -48,7 +48,7 @@ from app.storage.local import LocalStorageService
from .router import router
from fastapi import Body, Depends, Form, HTTPException, Query, Security
from fastapi import Body, Depends, Form, HTTPException, Path, Query, Security
from fastapi.responses import FileResponse, RedirectResponse
from httpx import HTTPError
from pydantic import BaseModel
@@ -129,17 +129,28 @@ class BeatmapScores(BaseModel):
@router.get(
"/beatmaps/{beatmap}/scores", tags=["beatmap"], response_model=BeatmapScores
"/beatmaps/{beatmap_id}/scores",
tags=["成绩"],
response_model=BeatmapScores,
name="获取谱面排行榜",
description="获取指定谱面在特定条件下的排行榜及当前用户成绩。",
)
async def get_beatmap_scores(
beatmap: int,
mode: GameMode,
legacy_only: bool = Query(None), # TODO:加入对这个参数的查询
mods: list[str] = Query(default_factory=set, alias="mods[]"),
type: LeaderboardType = Query(LeaderboardType.GLOBAL),
beatmap_id: int = Path(description="谱面 ID"),
mode: GameMode = Query(description="指定 auleset"),
legacy_only: bool = Query(None, description="是否只查询 Stable 分数"),
mods: list[str] = Query(
default_factory=set, alias="mods[]", description="筛选使用的 Mods (可选,多值)"
),
type: LeaderboardType = Query(
LeaderboardType.GLOBAL,
description=(
"排行榜类型GLOBAL 全局 / COUNTRY 国家 / FRIENDS 好友 / TEAM 战队"
),
),
current_user: User = Security(get_current_user, scopes=["public"]),
db: AsyncSession = Depends(get_db),
limit: int = Query(50, ge=1, le=200),
limit: int = Query(50, ge=1, le=200, description="返回条数 (1-200)"),
):
if legacy_only:
raise HTTPException(
@@ -147,7 +158,7 @@ async def get_beatmap_scores(
)
all_scores, user_score = await get_leaderboard(
db, beatmap, mode, type=type, user=current_user, limit=limit, mods=mods
db, beatmap_id, mode, type=type, user=current_user, limit=limit, mods=mods
)
return BeatmapScores(
@@ -162,16 +173,18 @@ class BeatmapUserScore(BaseModel):
@router.get(
"/beatmaps/{beatmap}/scores/users/{user}",
tags=["beatmap"],
"/beatmaps/{beatmap_id}/scores/users/{user_id}",
tags=["成绩"],
response_model=BeatmapUserScore,
name="获取用户谱面最高成绩",
description="获取指定用户在指定谱面上的最高成绩。",
)
async def get_user_beatmap_score(
beatmap: int,
user: int,
legacy_only: bool = Query(None),
mode: str = Query(None),
mods: str = Query(None), # TODO:添加mods筛选
beatmap_id: int = Path(description="谱面 ID"),
user_id: int = Path(description="用户 ID"),
legacy_only: bool = Query(None, description="是否只查询 Stable 分数"),
mode: str = Query(None, description="指定 ruleset (可选)"),
mods: str = Query(None, description="筛选使用的 Mods (暂未实现)"),
current_user: User = Security(get_current_user, scopes=["public"]),
db: AsyncSession = Depends(get_db),
):
@@ -184,8 +197,8 @@ async def get_user_beatmap_score(
select(Score)
.where(
Score.gamemode == mode if mode is not None else True,
Score.beatmap_id == beatmap,
Score.user_id == user,
Score.beatmap_id == beatmap_id,
Score.user_id == user_id,
)
.order_by(col(Score.total_score).desc())
)
@@ -193,7 +206,8 @@ async def get_user_beatmap_score(
if not user_score:
raise HTTPException(
status_code=404, detail=f"Cannot find user {user}'s score on this beatmap"
status_code=404,
detail=f"Cannot find user {user_id}'s score on this beatmap",
)
else:
resp = await ScoreResp.from_db(db, user_score)
@@ -204,15 +218,17 @@ async def get_user_beatmap_score(
@router.get(
"/beatmaps/{beatmap}/scores/users/{user}/all",
tags=["beatmap"],
"/beatmaps/{beatmap_id}/scores/users/{user_id}/all",
tags=["成绩"],
response_model=list[ScoreResp],
name="获取用户谱面全部成绩",
description="获取指定用户在指定谱面上的全部成绩列表。",
)
async def get_user_all_beatmap_scores(
beatmap: int,
user: int,
legacy_only: bool = Query(None),
ruleset: str = Query(None),
beatmap_id: int = Path(description="谱面 ID"),
user_id: int = Path(description="用户 ID"),
legacy_only: bool = Query(None, description="是否只查询 Stable 分数"),
ruleset: str = Query(None, description="指定 ruleset (可选)"),
current_user: User = Security(get_current_user, scopes=["public"]),
db: AsyncSession = Depends(get_db),
):
@@ -225,8 +241,8 @@ async def get_user_all_beatmap_scores(
select(Score)
.where(
Score.gamemode == ruleset if ruleset is not None else True,
Score.beatmap_id == beatmap,
Score.user_id == user,
Score.beatmap_id == beatmap_id,
Score.user_id == user_id,
)
.order_by(col(Score.classic_total_score).desc())
)
@@ -236,21 +252,25 @@ async def get_user_all_beatmap_scores(
@router.post(
"/beatmaps/{beatmap}/solo/scores", tags=["beatmap"], response_model=ScoreTokenResp
"/beatmaps/{beatmap_id}/solo/scores",
tags=["游玩"],
response_model=ScoreTokenResp,
name="创建单曲成绩提交令牌",
description="**客户端专属**\n为指定谱面创建一次性的成绩提交令牌。",
)
async def create_solo_score(
beatmap: int,
version_hash: str = Form(""),
beatmap_hash: str = Form(),
ruleset_id: int = Form(..., ge=0, le=3),
beatmap_id: int = Path(description="谱面 ID"),
version_hash: str = Form("", description="游戏版本哈希"),
beatmap_hash: str = Form(description="谱面文件哈希"),
ruleset_id: int = Form(..., ge=0, le=3, description="ruleset 数字 ID (0-3)"),
current_user: User = Security(get_current_user, scopes=["*"]),
db: AsyncSession = Depends(get_db),
):
assert current_user.id
assert current_user.id is not None
async with db:
score_token = ScoreToken(
user_id=current_user.id,
beatmap_id=beatmap,
beatmap_id=beatmap_id,
ruleset_id=INT_TO_MODE[ruleset_id],
)
db.add(score_token)
@@ -260,35 +280,43 @@ async def create_solo_score(
@router.put(
"/beatmaps/{beatmap}/solo/scores/{token}",
tags=["beatmap"],
"/beatmaps/{beatmap_id}/solo/scores/{token}",
tags=["游玩"],
response_model=ScoreResp,
name="提交单曲成绩",
description="**客户端专属**\n使用令牌提交单曲成绩。",
)
async def submit_solo_score(
beatmap: int,
token: int,
info: SoloScoreSubmissionInfo,
beatmap_id: int = Path(description="谱面 ID"),
token: int = Path(description="成绩令牌 ID"),
info: SoloScoreSubmissionInfo = Body(description="成绩提交信息"),
current_user: User = Security(get_current_user, scopes=["*"]),
db: AsyncSession = Depends(get_db),
redis: Redis = Depends(get_redis),
fetcher=Depends(get_fetcher),
):
return await submit_score(info, beatmap, token, current_user, db, redis, fetcher)
assert current_user.id is not None
return await submit_score(info, beatmap_id, token, current_user, db, redis, fetcher)
@router.post(
"/rooms/{room_id}/playlist/{playlist_id}/scores", response_model=ScoreTokenResp
"/rooms/{room_id}/playlist/{playlist_id}/scores",
tags=["游玩"],
response_model=ScoreTokenResp,
name="创建房间项目成绩令牌",
description="**客户端专属**\n为房间游玩项目创建成绩提交令牌。",
)
async def create_playlist_score(
room_id: int,
playlist_id: int,
beatmap_id: int = Form(),
beatmap_hash: str = Form(),
ruleset_id: int = Form(..., ge=0, le=3),
version_hash: str = Form(""),
beatmap_id: int = Form(description="谱面 ID"),
beatmap_hash: str = Form(description="游戏版本哈希"),
ruleset_id: int = Form(..., ge=0, le=3, description="ruleset 数字 ID (0-3)"),
version_hash: str = Form("", description="谱面版本哈希"),
current_user: User = Security(get_current_user, scopes=["*"]),
session: AsyncSession = Depends(get_db),
):
assert current_user.id is not None
room = await session.get(Room, room_id)
if not room:
raise HTTPException(status_code=404, detail="Room not found")
@@ -347,7 +375,12 @@ async def create_playlist_score(
return ScoreTokenResp.from_db(score_token)
@router.put("/rooms/{room_id}/playlist/{playlist_id}/scores/{token}")
@router.put(
"/rooms/{room_id}/playlist/{playlist_id}/scores/{token}",
tags=["游玩"],
name="提交房间项目成绩",
description="**客户端专属**\n提交房间游玩项目成绩。",
)
async def submit_playlist_score(
room_id: int,
playlist_id: int,
@@ -358,6 +391,7 @@ async def submit_playlist_score(
redis: Redis = Depends(get_redis),
fetcher: Fetcher = Depends(get_fetcher),
):
assert current_user.id is not None
item = (
await session.exec(
select(Playlist).where(
@@ -399,13 +433,19 @@ class IndexedScoreResp(MultiplayerScores):
@router.get(
"/rooms/{room_id}/playlist/{playlist_id}/scores", response_model=IndexedScoreResp
"/rooms/{room_id}/playlist/{playlist_id}/scores",
response_model=IndexedScoreResp,
name="获取房间项目排行榜",
description="获取房间游玩项目排行榜。",
tags=["成绩"],
)
async def index_playlist_scores(
room_id: int,
playlist_id: int,
limit: int = 50,
cursor: int = Query(2000000, alias="cursor[total_score]"),
limit: int = Query(50, ge=1, le=50, description="返回条数 (1-50)"),
cursor: int = Query(
2000000, alias="cursor[total_score]", description="分页游标(上一页最低分)"
),
current_user: User = Security(get_current_user, scopes=["public"]),
session: AsyncSession = Depends(get_db),
):
@@ -461,6 +501,9 @@ async def index_playlist_scores(
@router.get(
"/rooms/{room_id}/playlist/{playlist_id}/scores/{score_id}",
response_model=ScoreResp,
name="获取房间项目单个成绩",
description="获取指定房间游玩项目中单个成绩详情。",
tags=["成绩"],
)
async def show_playlist_score(
room_id: int,
@@ -525,6 +568,9 @@ async def show_playlist_score(
@router.get(
"rooms/{room_id}/playlist/{playlist_id}/scores/users/{user_id}",
response_model=ScoreResp,
name="获取房间项目用户成绩",
description="获取指定用户在房间游玩项目中的成绩。",
tags=["成绩"],
)
async def get_user_playlist_score(
room_id: int,
@@ -557,16 +603,22 @@ async def get_user_playlist_score(
return resp
@router.put("/score-pins/{score}", status_code=204)
@router.put(
"/score-pins/{score_id}",
status_code=204,
name="置顶成绩",
description="**客户端专属**\n将指定成绩置顶到用户主页 (按顺序)。",
tags=["成绩"],
)
async def pin_score(
score: int,
score_id: int = Path(description="成绩 ID"),
current_user: User = Security(get_current_user, scopes=["*"]),
db: AsyncSession = Depends(get_db),
):
score_record = (
await db.exec(
select(Score).where(
Score.id == score,
Score.id == score_id,
Score.user_id == current_user.id,
col(Score.passed).is_(True),
)
@@ -593,15 +645,21 @@ async def pin_score(
await db.commit()
@router.delete("/score-pins/{score}", status_code=204)
@router.delete(
"/score-pins/{score_id}",
status_code=204,
name="取消置顶成绩",
description="**客户端专属**\n取消置顶指定成绩。",
tags=["成绩"],
)
async def unpin_score(
score: int,
score_id: int = Path(description="成绩 ID"),
current_user: User = Security(get_current_user, scopes=["*"]),
db: AsyncSession = Depends(get_db),
):
score_record = (
await db.exec(
select(Score).where(Score.id == score, Score.user_id == current_user.id)
select(Score).where(Score.id == score_id, Score.user_id == current_user.id)
)
).first()
if not score_record:
@@ -623,17 +681,26 @@ async def unpin_score(
await db.commit()
@router.post("/score-pins/{score}/reorder", status_code=204)
@router.post(
"/score-pins/{score_id}/reorder",
status_code=204,
name="调整置顶成绩顺序",
description=(
"**客户端专属**\n调整已置顶成绩的展示顺序。"
"仅提供 after_score_id 或 before_score_id 之一。"
),
tags=["成绩"],
)
async def reorder_score_pin(
score: int,
after_score_id: int | None = Body(default=None),
before_score_id: int | None = Body(default=None),
score_id: int = Path(description="成绩 ID"),
after_score_id: int | None = Body(default=None, description="放在该成绩之后"),
before_score_id: int | None = Body(default=None, description="放在该成绩之前"),
current_user: User = Security(get_current_user, scopes=["*"]),
db: AsyncSession = Depends(get_db),
):
score_record = (
await db.exec(
select(Score).where(Score.id == score, Score.user_id == current_user.id)
select(Score).where(Score.id == score_id, Score.user_id == current_user.id)
)
).first()
if not score_record:
@@ -685,7 +752,7 @@ async def reorder_score_pin(
if current_order < target_order:
for s in all_pinned_scores:
if current_order < s.pinned_order <= target_order and s.id != score:
if current_order < s.pinned_order <= target_order and s.id != score_id:
updates.append((s.id, s.pinned_order - 1))
if after_score_id:
final_target = (
@@ -695,7 +762,7 @@ async def reorder_score_pin(
final_target = target_order
else:
for s in all_pinned_scores:
if target_order <= s.pinned_order < current_order and s.id != score:
if target_order <= s.pinned_order < current_order and s.id != score_id:
updates.append((s.id, s.pinned_order + 1))
final_target = target_order
@@ -712,7 +779,12 @@ async def reorder_score_pin(
await db.commit()
@router.get("/scores/{score_id}/download")
@router.get(
"/scores/{score_id}/download",
name="下载成绩回放",
description="下载指定成绩的回放文件。",
tags=["成绩"],
)
async def download_score_replay(
score_id: int,
current_user: User = Security(get_current_user, scopes=["public"]),

View File

@@ -20,7 +20,7 @@ from app.models.user import BeatmapsetType
from .router import router
from fastapi import Depends, HTTPException, Query, Security
from fastapi import Depends, HTTPException, Path, Query, Security
from pydantic import BaseModel
from sqlmodel import exists, false, select
from sqlmodel.ext.asyncio.session import AsyncSession
@@ -31,13 +31,23 @@ class BatchUserResponse(BaseModel):
users: list[UserResp]
@router.get("/users", response_model=BatchUserResponse)
@router.get("/users/lookup", response_model=BatchUserResponse)
@router.get("/users/lookup/", response_model=BatchUserResponse)
@router.get(
"/users",
response_model=BatchUserResponse,
name="批量获取用户信息",
description="通过用户 ID 列表批量获取用户信息。",
tags=["用户"],
)
@router.get("/users/lookup", response_model=BatchUserResponse, include_in_schema=False)
@router.get("/users/lookup/", response_model=BatchUserResponse, include_in_schema=False)
async def get_users(
user_ids: list[int] = Query(default_factory=list, alias="ids[]"),
user_ids: list[int] = Query(
default_factory=list, alias="ids[]", description="要查询的用户 ID 列表"
),
current_user: User = Security(get_current_user, scopes=["public"]),
include_variant_statistics: bool = Query(default=False), # TODO: future use
include_variant_statistics: bool = Query(
default=False, description="是否包含各模式的统计信息"
), # TODO: future use
session: AsyncSession = Depends(get_db),
):
if user_ids:
@@ -58,21 +68,25 @@ async def get_users(
)
@router.get("/users/{user}/{ruleset}", response_model=UserResp)
@router.get("/users/{user}/", response_model=UserResp)
@router.get("/users/{user}", response_model=UserResp)
async def get_user_info(
user: str,
ruleset: GameMode | None = None,
@router.get(
"/users/{user_id}/{ruleset}",
response_model=UserResp,
name="获取用户信息(指定ruleset)",
description="通过用户 ID 或用户名获取单个用户的详细信息,并指定特定 ruleset。",
tags=["用户"],
)
async def get_user_info_ruleset(
user_id: str = Path(description="用户 ID 或用户名"),
ruleset: GameMode | None = Path(description="指定 ruleset"),
session: AsyncSession = Depends(get_db),
current_user: User = Security(get_current_user, scopes=["public"]),
):
searched_user = (
await session.exec(
select(User).where(
User.id == int(user)
if user.isdigit()
else User.username == user.removeprefix("@")
User.id == int(user_id)
if user_id.isdigit()
else User.username == user_id.removeprefix("@")
)
)
).first()
@@ -86,17 +100,51 @@ async def get_user_info(
)
@router.get("/users/{user_id}/", response_model=UserResp, include_in_schema=False)
@router.get(
"/users/{user_id}",
response_model=UserResp,
name="获取用户信息",
description="通过用户 ID 或用户名获取单个用户的详细信息。",
tags=["用户"],
)
async def get_user_info(
user_id: str = Path(description="用户 ID 或用户名"),
session: AsyncSession = Depends(get_db),
current_user: User = Security(get_current_user, scopes=["public"]),
):
searched_user = (
await session.exec(
select(User).where(
User.id == int(user_id)
if user_id.isdigit()
else User.username == user_id.removeprefix("@")
)
)
).first()
if not searched_user:
raise HTTPException(404, detail="User not found")
return await UserResp.from_db(
searched_user,
session,
include=SEARCH_INCLUDED,
)
@router.get(
"/users/{user_id}/beatmapsets/{type}",
response_model=list[BeatmapsetResp | BeatmapPlaycountsResp],
name="获取用户谱面集列表",
description="获取指定用户特定类型的谱面集列表,如最常游玩、收藏等。",
tags=["用户"],
)
async def get_user_beatmapsets(
user_id: int,
type: BeatmapsetType,
user_id: int = Path(description="用户 ID"),
type: BeatmapsetType = Path(description="谱面集类型"),
current_user: User = Security(get_current_user, scopes=["public"]),
session: AsyncSession = Depends(get_db),
limit: int = Query(100, ge=1, le=1000),
offset: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=1000, description="返回条数 (1-1000)"),
offset: int = Query(0, ge=0, description="偏移量"),
):
if type in {
BeatmapsetType.GRAVEYARD,
@@ -139,19 +187,32 @@ async def get_user_beatmapsets(
return resp
@router.get("/users/{user}/scores/{type}", response_model=list[ScoreResp])
@router.get(
"/users/{user_id}/scores/{type}",
response_model=list[ScoreResp],
name="获取用户成绩列表",
description="获取用户特定类型的成绩列表,如最好成绩、最近成绩等。",
tags=["用户"],
)
async def get_user_scores(
user: int,
type: Literal["best", "recent", "firsts", "pinned"],
legacy_only: bool = Query(False),
include_fails: bool = Query(False),
mode: GameMode | None = None,
limit: int = Query(100, ge=1, le=1000),
offset: int = Query(0, ge=0),
user_id: int = Path(description="用户 ID"),
type: Literal["best", "recent", "firsts", "pinned"] = Path(
description=(
"成绩类型: best 最好成绩 / recent 最近 24h 游玩成绩"
" / firsts 第一名成绩 / pinned 置顶成绩"
)
),
legacy_only: bool = Query(False, description="是否只查询 Stable 成绩"),
include_fails: bool = Query(False, description="是否包含失败的成绩"),
mode: GameMode | None = Query(
None, description="指定 ruleset (可选,默认为用户主模式)"
),
limit: int = Query(100, ge=1, le=1000, description="返回条数 (1-1000)"),
offset: int = Query(0, ge=0, description="偏移量"),
session: AsyncSession = Depends(get_db),
current_user: User = Security(get_current_user, scopes=["public"]),
):
db_user = await session.get(User, user)
db_user = await session.get(User, user_id)
if not db_user:
raise HTTPException(404, detail="User not found")

View File

@@ -18,7 +18,7 @@ from fastapi import APIRouter, Depends, Header, HTTPException, Query, WebSocket
from fastapi.security import SecurityScopes
from sqlmodel.ext.asyncio.session import AsyncSession
router = APIRouter(prefix="/signalr", tags=["SignalR"])
router = APIRouter(prefix="/signalr", include_in_schema=False)
@router.post("/{hub}/negotiate", response_model=NegotiateResponse)

11
main.py
View File

@@ -38,7 +38,12 @@ async def lifespan(app: FastAPI):
await redis_client.aclose()
app = FastAPI(title="osu! API 模拟服务器", version="1.0.0", lifespan=lifespan)
app = FastAPI(
title="osu! API 模拟服务器",
version="1.0.0",
lifespan=lifespan,
summary="osu! API 模拟服务器,支持 osu! API v2 和 osu!lazer 的绝大部分功能。官方文档https://osu.ppy.sh/docs/index.html",
)
app.include_router(api_v2_router)
app.include_router(signalr_router)
@@ -56,13 +61,13 @@ app.add_middleware(
)
@app.get("/")
@app.get("/", include_in_schema=False)
async def root():
"""根端点"""
return {"message": "osu! API 模拟服务器正在运行"}
@app.get("/health")
@app.get("/health", include_in_schema=False)
async def health_check():
"""健康检查端点"""
return {"status": "ok", "timestamp": datetime.utcnow().isoformat()}