chore(merge): merge branch 'main' of https://github.com/GooGuTeam/osu_lazer_api

This commit is contained in:
MingxuanGame
2025-07-28 16:06:31 +00:00
2 changed files with 109 additions and 37 deletions

View File

@@ -2,24 +2,48 @@ from __future__ import annotations
from typing import Literal from typing import Literal
from app.database import ( from app.database import User as DBUser
User as DBUser,
)
from app.dependencies.database import get_db from app.dependencies.database import get_db
from app.dependencies import get_current_user
from app.models.score import INT_TO_MODE from app.models.score import INT_TO_MODE
from app.models.user import ( from app.models.user import User as ApiUser
User as ApiUser,
)
from app.utils import convert_db_user_to_api_user from app.utils import convert_db_user_to_api_user
from .api_router import router from .api_router import router
from fastapi import Depends, HTTPException, Query from fastapi import Depends, HTTPException, Query
from pydantic import BaseModel from pydantic import BaseModel
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession from sqlmodel.ext.asyncio.session import AsyncSession
from sqlmodel.sql.expression import col from sqlmodel.sql.expression import col
# ---------- Shared Utility ----------
async def get_user_by_lookup(
db: AsyncSession,
lookup: str,
key: str = "id"
) -> DBUser | None:
"""根据查找方式获取用户"""
if key == "id":
try:
user_id = int(lookup)
result = await db.exec(
select(DBUser).where(DBUser.id == user_id)
)
return result.first()
except ValueError:
return None
elif key == "username":
result = await db.exec(
select(DBUser).where(DBUser.name == lookup)
)
return result.first()
else:
return None
# ---------- Batch Users ----------
class BatchUserResponse(BaseModel): class BatchUserResponse(BaseModel):
users: list[ApiUser] users: list[ApiUser]
@@ -28,7 +52,7 @@ class BatchUserResponse(BaseModel):
@router.get("/users/lookup", response_model=BatchUserResponse) @router.get("/users/lookup", response_model=BatchUserResponse)
async def get_users( async def get_users(
user_ids: list[int] = Query(default_factory=list, alias="ids[]"), user_ids: list[int] = Query(default_factory=list, alias="ids[]"),
include_variant_statistics: bool = Query(default=False), # TODO include_variant_statistics: bool = Query(default=False), # TODO: future use
session: AsyncSession = Depends(get_db), session: AsyncSession = Depends(get_db),
): ):
if user_ids: if user_ids:
@@ -51,8 +75,41 @@ async def get_users(
) )
# ---------- Individual User ----------
@router.get("/users/{user_lookup}/{mode}", response_model=ApiUser)
@router.get("/users/{user_lookup}/{mode}/", response_model=ApiUser)
async def get_user_with_mode(
user_lookup: str,
mode: Literal["osu", "taiko", "fruits", "mania"],
key: Literal["id", "username"] = Query("id"),
current_user: DBUser = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""获取指定游戏模式的用户信息"""
user = await get_user_by_lookup(db, user_lookup, key)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return await convert_db_user_to_api_user(user, mode)
@router.get("/users/{user_lookup}", response_model=ApiUser)
@router.get("/users/{user_lookup}/", response_model=ApiUser)
async def get_user_default(
user_lookup: str,
key: Literal["id", "username"] = Query("id"),
current_user: DBUser = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""获取用户信息默认使用osu模式但包含所有模式的统计信息"""
user = await get_user_by_lookup(db, user_lookup, key)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return await convert_db_user_to_api_user(user, "osu")
@router.get("/users/{user}/{ruleset}", response_model=ApiUser) @router.get("/users/{user}/{ruleset}", response_model=ApiUser)
@router.get("/users/{user}", response_model=ApiUser)
async def get_user_info( async def get_user_info(
user: str, user: str,
ruleset: Literal["osu", "taiko", "fruits", "mania"] = "osu", ruleset: Literal["osu", "taiko", "fruits", "mania"] = "osu",

View File

@@ -14,6 +14,7 @@ from app.models.score import GameMode
from app.models.user import ( from app.models.user import (
Country, Country,
Cover, Cover,
DailyChallengeStats,
GradeCounts, GradeCounts,
Kudosu, Kudosu,
Level, Level,
@@ -115,34 +116,37 @@ async def convert_db_user_to_api_user(db_user: DBUser, ruleset: str = "osu") ->
# 转换所有模式的统计信息 # 转换所有模式的统计信息
statistics_rulesets = {} statistics_rulesets = {}
for stat in db_user.statistics: if db_user.lazer_statistics:
statistics_rulesets[stat.mode] = Statistics( for stat in db_user.lazer_statistics:
count_100=stat.count_100, statistics_rulesets[stat.mode] = Statistics(
count_300=stat.count_300, count_100=stat.count_100,
count_50=stat.count_50, count_300=stat.count_300,
count_miss=stat.count_miss, count_50=stat.count_50,
level=Level(current=stat.level_current, progress=stat.level_progress), count_miss=stat.count_miss,
global_rank=stat.global_rank, level=Level(current=stat.level_current, progress=stat.level_progress),
global_rank_exp=stat.global_rank_exp, global_rank=stat.global_rank,
pp=stat.pp, global_rank_exp=stat.global_rank_exp,
pp_exp=stat.pp_exp, pp=float(stat.pp) if stat.pp else 0.0,
ranked_score=stat.ranked_score, pp_exp=float(stat.pp_exp) if stat.pp_exp else 0.0,
hit_accuracy=stat.hit_accuracy, ranked_score=stat.ranked_score,
play_count=stat.play_count, hit_accuracy=float(stat.hit_accuracy) if stat.hit_accuracy else 0.0,
play_time=stat.play_time, play_count=stat.play_count,
total_score=stat.total_score, play_time=stat.play_time,
total_hits=stat.total_hits, total_score=stat.total_score,
maximum_combo=stat.maximum_combo, total_hits=stat.total_hits,
replays_watched_by_others=stat.replays_watched_by_others, maximum_combo=stat.maximum_combo,
is_ranked=stat.is_ranked, replays_watched_by_others=stat.replays_watched_by_others,
grade_counts=GradeCounts( is_ranked=stat.is_ranked,
ss=stat.grade_ss, grade_counts=GradeCounts(
ssh=stat.grade_ssh, ss=stat.grade_ss,
s=stat.grade_s, ssh=stat.grade_ssh,
sh=stat.grade_sh, s=stat.grade_s,
a=stat.grade_a, sh=stat.grade_sh,
), a=stat.grade_a,
) ),
country_rank=stat.country_rank,
rank={"country": stat.country_rank} if stat.country_rank else None,
)
# 转换国家信息 # 转换国家信息
country = Country(code=user_country_code, name=get_country_name(user_country_code)) country = Country(code=user_country_code, name=get_country_name(user_country_code))
@@ -401,7 +405,18 @@ async def convert_db_user_to_api_user(db_user: DBUser, ruleset: str = "osu") ->
active_tournament_banners=active_tournament_banners, active_tournament_banners=active_tournament_banners,
badges=badges, badges=badges,
current_season_stats=None, current_season_stats=None,
daily_challenge_user_stats=None, daily_challenge_user_stats=DailyChallengeStats(
user_id=user_id,
daily_streak_best=db_user.daily_challenge_stats.daily_streak_best if db_user.daily_challenge_stats else 0,
daily_streak_current=db_user.daily_challenge_stats.daily_streak_current if db_user.daily_challenge_stats else 0,
last_update=db_user.daily_challenge_stats.last_update if db_user.daily_challenge_stats else None,
last_weekly_streak=db_user.daily_challenge_stats.last_weekly_streak if db_user.daily_challenge_stats else None,
playcount=db_user.daily_challenge_stats.playcount if db_user.daily_challenge_stats else 0,
top_10p_placements=db_user.daily_challenge_stats.top_10p_placements if db_user.daily_challenge_stats else 0,
top_50p_placements=db_user.daily_challenge_stats.top_50p_placements if db_user.daily_challenge_stats else 0,
weekly_streak_best=db_user.daily_challenge_stats.weekly_streak_best if db_user.daily_challenge_stats else 0,
weekly_streak_current=db_user.daily_challenge_stats.weekly_streak_current if db_user.daily_challenge_stats else 0,
),
groups=[], groups=[],
monthly_playcounts=monthly_playcounts, monthly_playcounts=monthly_playcounts,
page=Page(html=profile.page_html or "", raw=profile.page_raw or "") page=Page(html=profile.page_html or "", raw=profile.page_raw or "")