add info api

This commit is contained in:
咕谷酱
2025-07-28 21:22:29 +08:00
parent 80310d450b
commit ae6ef11350
3 changed files with 119 additions and 29 deletions

View File

@@ -8,6 +8,7 @@ from . import ( # pyright: ignore[reportUnusedImport] # noqa: F401
me,
relationship,
score,
user,
)
from .api_router import router as api_router
from .auth import router as auth_router

74
app/router/user.py Normal file
View File

@@ -0,0 +1,74 @@
from __future__ import annotations
from typing import Literal
from app.database import User as DBUser
from app.dependencies import get_db, get_current_user
from app.models.user import User as ApiUser
from app.utils import convert_db_user_to_api_user
from .api_router import router
from fastapi import Depends, HTTPException, Query
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
async def get_user_by_lookup(
db: AsyncSession,
lookup: str,
key: str = "id"
) -> DBUser | None:
"""根据查找方式获取用户"""
if key == "id":
try:
user_id = int(lookup)
result = await db.exec(
select(DBUser).where(DBUser.id == user_id)
)
return result.first()
except ValueError:
return None
elif key == "username":
result = await db.exec(
select(DBUser).where(DBUser.name == lookup)
)
return result.first()
else:
return None
@router.get("/users/{user_lookup}/{mode}", response_model=ApiUser)
@router.get("/users/{user_lookup}/{mode}/", response_model=ApiUser)
async def get_user_with_mode(
user_lookup: str,
mode: Literal["osu", "taiko", "fruits", "mania"],
key: Literal["id", "username"] = Query("id"),
current_user: DBUser = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""获取指定游戏模式的用户信息"""
user = await get_user_by_lookup(db, user_lookup, key)
if not user:
raise HTTPException(status_code=404, detail="User not found")
api_user = await convert_db_user_to_api_user(user, mode)
return api_user
@router.get("/users/{user_lookup}", response_model=ApiUser)
@router.get("/users/{user_lookup}/", response_model=ApiUser)
async def get_user_default(
user_lookup: str,
key: Literal["id", "username"] = Query("id"),
current_user: DBUser = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""获取用户信息默认使用osu模式但包含所有模式的统计信息"""
user = await get_user_by_lookup(db, user_lookup, key)
if not user:
raise HTTPException(status_code=404, detail="User not found")
api_user = await convert_db_user_to_api_user(user, "osu")
return api_user

View File

@@ -14,6 +14,7 @@ from app.models.score import GameMode
from app.models.user import (
Country,
Cover,
DailyChallengeStats,
GradeCounts,
Kudosu,
Level,
@@ -115,34 +116,37 @@ async def convert_db_user_to_api_user(db_user: DBUser, ruleset: str = "osu") ->
# 转换所有模式的统计信息
statistics_rulesets = {}
for stat in db_user.statistics:
statistics_rulesets[stat.mode] = Statistics(
count_100=stat.count_100,
count_300=stat.count_300,
count_50=stat.count_50,
count_miss=stat.count_miss,
level=Level(current=stat.level_current, progress=stat.level_progress),
global_rank=stat.global_rank,
global_rank_exp=stat.global_rank_exp,
pp=stat.pp,
pp_exp=stat.pp_exp,
ranked_score=stat.ranked_score,
hit_accuracy=stat.hit_accuracy,
play_count=stat.play_count,
play_time=stat.play_time,
total_score=stat.total_score,
total_hits=stat.total_hits,
maximum_combo=stat.maximum_combo,
replays_watched_by_others=stat.replays_watched_by_others,
is_ranked=stat.is_ranked,
grade_counts=GradeCounts(
ss=stat.grade_ss,
ssh=stat.grade_ssh,
s=stat.grade_s,
sh=stat.grade_sh,
a=stat.grade_a,
),
)
if db_user.lazer_statistics:
for stat in db_user.lazer_statistics:
statistics_rulesets[stat.mode] = Statistics(
count_100=stat.count_100,
count_300=stat.count_300,
count_50=stat.count_50,
count_miss=stat.count_miss,
level=Level(current=stat.level_current, progress=stat.level_progress),
global_rank=stat.global_rank,
global_rank_exp=stat.global_rank_exp,
pp=float(stat.pp) if stat.pp else 0.0,
pp_exp=float(stat.pp_exp) if stat.pp_exp else 0.0,
ranked_score=stat.ranked_score,
hit_accuracy=float(stat.hit_accuracy) if stat.hit_accuracy else 0.0,
play_count=stat.play_count,
play_time=stat.play_time,
total_score=stat.total_score,
total_hits=stat.total_hits,
maximum_combo=stat.maximum_combo,
replays_watched_by_others=stat.replays_watched_by_others,
is_ranked=stat.is_ranked,
grade_counts=GradeCounts(
ss=stat.grade_ss,
ssh=stat.grade_ssh,
s=stat.grade_s,
sh=stat.grade_sh,
a=stat.grade_a,
),
country_rank=stat.country_rank,
rank={"country": stat.country_rank} if stat.country_rank else None,
)
# 转换国家信息
country = Country(code=user_country_code, name=get_country_name(user_country_code))
@@ -401,7 +405,18 @@ async def convert_db_user_to_api_user(db_user: DBUser, ruleset: str = "osu") ->
active_tournament_banners=active_tournament_banners,
badges=badges,
current_season_stats=None,
daily_challenge_user_stats=None,
daily_challenge_user_stats=DailyChallengeStats(
user_id=user_id,
daily_streak_best=db_user.daily_challenge_stats.daily_streak_best if db_user.daily_challenge_stats else 0,
daily_streak_current=db_user.daily_challenge_stats.daily_streak_current if db_user.daily_challenge_stats else 0,
last_update=db_user.daily_challenge_stats.last_update if db_user.daily_challenge_stats else None,
last_weekly_streak=db_user.daily_challenge_stats.last_weekly_streak if db_user.daily_challenge_stats else None,
playcount=db_user.daily_challenge_stats.playcount if db_user.daily_challenge_stats else 0,
top_10p_placements=db_user.daily_challenge_stats.top_10p_placements if db_user.daily_challenge_stats else 0,
top_50p_placements=db_user.daily_challenge_stats.top_50p_placements if db_user.daily_challenge_stats else 0,
weekly_streak_best=db_user.daily_challenge_stats.weekly_streak_best if db_user.daily_challenge_stats else 0,
weekly_streak_current=db_user.daily_challenge_stats.weekly_streak_current if db_user.daily_challenge_stats else 0,
),
groups=[],
monthly_playcounts=monthly_playcounts,
page=Page(html=profile.page_html or "", raw=profile.page_raw or "")