Files
g0v0-server/app/database/team.py

145 lines
5.7 KiB
Python

from datetime import datetime
from typing import TYPE_CHECKING
from app.models.model import UTCBaseModel
from app.models.score import GameMode
from app.utils import utcnow
from sqlalchemy import Column, DateTime
from sqlmodel import BigInteger, Field, ForeignKey, Relationship, SQLModel, Text, col, func, select
from sqlmodel.ext.asyncio.session import AsyncSession
if TYPE_CHECKING:
from .user import User
class TeamBase(SQLModel, UTCBaseModel):
id: int = Field(default=None, primary_key=True, index=True)
name: str = Field(max_length=100)
short_name: str = Field(max_length=10)
flag_url: str | None = Field(default=None)
cover_url: str | None = Field(default=None)
created_at: datetime = Field(default_factory=utcnow, sa_column=Column(DateTime))
leader_id: int = Field(sa_column=Column(BigInteger, ForeignKey("lazer_users.id")))
description: str | None = Field(default=None, sa_column=Column(Text))
playmode: GameMode = Field(default=GameMode.OSU)
website: str | None = Field(default=None, sa_column=Column(Text))
class Team(TeamBase, table=True):
__tablename__: str = "teams"
leader: "User" = Relationship()
members: list["TeamMember"] = Relationship(back_populates="team")
class TeamResp(TeamBase):
rank: int = 0
pp: float = 0.0
ranked_score: int = 0
total_play_count: int = 0
member_count: int = 0
@classmethod
async def from_db(cls, team: Team, session: AsyncSession, gamemode: GameMode | None = None) -> "TeamResp":
from .statistics import UserStatistics
from .user import User
playmode = gamemode or team.playmode
pp_expr = func.coalesce(func.sum(col(UserStatistics.pp)), 0.0)
ranked_score_expr = func.coalesce(func.sum(col(UserStatistics.ranked_score)), 0)
play_count_expr = func.coalesce(func.sum(col(UserStatistics.play_count)), 0)
member_count_expr = func.count(func.distinct(col(UserStatistics.user_id)))
team_stats_stmt = (
select(pp_expr, ranked_score_expr, play_count_expr, member_count_expr)
.select_from(UserStatistics)
.join(TeamMember, col(TeamMember.user_id) == col(UserStatistics.user_id))
.join(User, col(User.id) == col(UserStatistics.user_id))
.join(Team, col(Team.id) == col(TeamMember.team_id))
.where(
col(Team.id) == team.id,
col(Team.playmode) == playmode,
col(UserStatistics.mode) == playmode,
col(UserStatistics.pp) > 0,
col(UserStatistics.is_ranked).is_(True),
~User.is_restricted_query(col(UserStatistics.user_id)),
)
)
team_stats_result = await session.exec(team_stats_stmt)
stats_row = team_stats_result.one_or_none()
if stats_row is None:
total_pp = 0.0
total_ranked_score = 0
total_play_count = 0
active_member_count = 0
else:
total_pp, total_ranked_score, total_play_count, active_member_count = stats_row
total_pp = float(total_pp or 0.0)
total_ranked_score = int(total_ranked_score or 0)
total_play_count = int(total_play_count or 0)
active_member_count = int(active_member_count or 0)
total_pp_ranking_expr = func.coalesce(func.sum(col(UserStatistics.pp)), 0.0)
ranking_stmt = (
select(Team.id, total_pp_ranking_expr)
.select_from(Team)
.join(TeamMember, col(TeamMember.team_id) == col(Team.id))
.join(UserStatistics, col(UserStatistics.user_id) == col(TeamMember.user_id))
.join(User, col(User.id) == col(TeamMember.user_id))
.where(
col(Team.playmode) == playmode,
col(UserStatistics.mode) == playmode,
col(UserStatistics.pp) > 0,
col(UserStatistics.is_ranked).is_(True),
~User.is_restricted_query(col(UserStatistics.user_id)),
)
.group_by(col(Team.id))
.order_by(total_pp_ranking_expr.desc())
)
ranking_result = await session.exec(ranking_stmt)
ranking_rows = ranking_result.all()
rank = 0
for index, (team_id, _) in enumerate(ranking_rows, start=1):
if team_id == team.id:
rank = index
break
data = team.model_dump()
data.update(
{
"pp": total_pp,
"ranked_score": total_ranked_score,
"total_play_count": total_play_count,
"member_count": active_member_count,
"rank": rank,
}
)
return cls.model_validate(data)
class TeamMember(SQLModel, UTCBaseModel, table=True):
__tablename__: str = "team_members"
user_id: int = Field(sa_column=Column(BigInteger, ForeignKey("lazer_users.id"), primary_key=True))
team_id: int = Field(foreign_key="teams.id")
joined_at: datetime = Field(default_factory=utcnow, sa_column=Column(DateTime))
user: "User" = Relationship(back_populates="team_membership", sa_relationship_kwargs={"lazy": "joined"})
team: "Team" = Relationship(back_populates="members", sa_relationship_kwargs={"lazy": "joined"})
class TeamRequest(SQLModel, UTCBaseModel, table=True):
__tablename__: str = "team_requests"
user_id: int = Field(sa_column=Column(BigInteger, ForeignKey("lazer_users.id"), primary_key=True))
team_id: int = Field(foreign_key="teams.id", primary_key=True)
requested_at: datetime = Field(default_factory=utcnow, sa_column=Column(DateTime))
user: "User" = Relationship(sa_relationship_kwargs={"lazy": "joined"})
team: "Team" = Relationship(sa_relationship_kwargs={"lazy": "joined"})