Files
g0v0-server/app/router/user.py

133 lines
3.9 KiB
Python

from __future__ import annotations
from app.database import (
BeatmapPlaycounts,
BeatmapPlaycountsResp,
BeatmapsetResp,
User,
UserResp,
)
from app.database.lazer_user import SEARCH_INCLUDED
from app.dependencies.database import get_db
from app.dependencies.user import get_current_user
from app.models.score import GameMode
from app.models.user import BeatmapsetType
from .api_router import router
from fastapi import Depends, HTTPException, Query
from pydantic import BaseModel
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from sqlmodel.sql.expression import col
class BatchUserResponse(BaseModel):
users: list[UserResp]
@router.get("/users", response_model=BatchUserResponse)
@router.get("/users/lookup", response_model=BatchUserResponse)
@router.get("/users/lookup/", response_model=BatchUserResponse)
async def get_users(
user_ids: list[int] = Query(default_factory=list, alias="ids[]"),
include_variant_statistics: bool = Query(default=False), # TODO: future use
session: AsyncSession = Depends(get_db),
):
if user_ids:
searched_users = (
await session.exec(select(User).limit(50).where(col(User.id).in_(user_ids)))
).all()
else:
searched_users = (await session.exec(select(User).limit(50))).all()
return BatchUserResponse(
users=[
await UserResp.from_db(
searched_user,
session,
include=SEARCH_INCLUDED,
)
for searched_user in searched_users
]
)
@router.get("/users/{user}/{ruleset}", response_model=UserResp)
@router.get("/users/{user}/", response_model=UserResp)
@router.get("/users/{user}", response_model=UserResp)
async def get_user_info(
user: str,
ruleset: GameMode | None = None,
session: AsyncSession = Depends(get_db),
):
searched_user = (
await session.exec(
select(User).where(
User.id == int(user)
if user.isdigit()
else User.username == user.removeprefix("@")
)
)
).first()
if not searched_user:
raise HTTPException(404, detail="User not found")
return await UserResp.from_db(
searched_user,
session,
include=SEARCH_INCLUDED,
ruleset=ruleset,
)
@router.get(
"/users/{user_id}/beatmapsets/{type}",
response_model=list[BeatmapsetResp | BeatmapPlaycountsResp],
)
async def get_user_beatmapsets(
user_id: int,
type: BeatmapsetType,
current_user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_db),
limit: int = Query(100, ge=1, le=1000),
offset: int = Query(0, ge=0),
):
if type in {
BeatmapsetType.GRAVEYARD,
BeatmapsetType.GUEST,
BeatmapsetType.LOVED,
BeatmapsetType.NOMINATED,
BeatmapsetType.PENDING,
BeatmapsetType.RANKED,
}:
# TODO: mapping, modding
resp = []
elif type == BeatmapsetType.FAVOURITE:
user = await session.get(User, user_id)
if not user:
raise HTTPException(404, detail="User not found")
favourites = await user.awaitable_attrs.favourite_beatmapsets
resp = [
await BeatmapsetResp.from_db(
favourite.beatmapset, session=session, user=current_user
)
for favourite in favourites
]
elif type == BeatmapsetType.MOST_PLAYED:
most_played = await session.exec(
select(BeatmapPlaycounts)
.where(BeatmapPlaycounts.user_id == user_id)
.order_by(col(BeatmapPlaycounts.playcount).desc())
.limit(limit)
.offset(offset)
)
resp = [
await BeatmapPlaycountsResp.from_db(most_played_beatmap)
for most_played_beatmap in most_played
]
else:
raise HTTPException(400, detail="Invalid beatmapset type")
return resp