diff --git a/app/database/achievement.py b/app/database/achievement.py new file mode 100644 index 0000000..4be587f --- /dev/null +++ b/app/database/achievement.py @@ -0,0 +1,40 @@ +from datetime import UTC, datetime +from typing import TYPE_CHECKING + +from app.models.model import UTCBaseModel + +from sqlmodel import ( + BigInteger, + Column, + DateTime, + Field, + ForeignKey, + Relationship, + SQLModel, +) + +if TYPE_CHECKING: + from .lazer_user import User + + +class UserAchievementBase(SQLModel, UTCBaseModel): + achievement_id: int = Field(primary_key=True) + achieved_at: datetime = Field( + default=datetime.now(UTC), sa_column=Column(DateTime(timezone=True)) + ) + + +class UserAchievement(UserAchievementBase, table=True): + __tablename__ = "lazer_user_achievements" # pyright: ignore[reportAssignmentType] + + id: int | None = Field(default=None, primary_key=True, index=True) + user_id: int = Field( + sa_column=Column(BigInteger, ForeignKey("lazer_users.id")), exclude=True + ) + user: "User" = Relationship(back_populates="achievement") + + +class UserAchievementResp(UserAchievementBase): + @classmethod + def from_db(cls, db_model: UserAchievement) -> "UserAchievementResp": + return cls.model_validate(db_model) diff --git a/app/database/daily_challenge.py b/app/database/daily_challenge.py new file mode 100644 index 0000000..abf874f --- /dev/null +++ b/app/database/daily_challenge.py @@ -0,0 +1,58 @@ +from datetime import datetime +from typing import TYPE_CHECKING + +from app.models.model import UTCBaseModel + +from sqlmodel import ( + BigInteger, + Column, + DateTime, + Field, + ForeignKey, + Relationship, + SQLModel, +) + +if TYPE_CHECKING: + from .lazer_user import User + + +class DailyChallengeStatsBase(SQLModel, UTCBaseModel): + daily_streak_best: int = Field(default=0) + daily_streak_current: int = Field(default=0) + last_update: datetime | None = Field(default=None, sa_column=Column(DateTime)) + last_weekly_streak: datetime | None = Field( + default=None, sa_column=Column(DateTime) + ) + playcount: int = Field(default=0) + top_10p_placements: int = Field(default=0) + top_50p_placements: int = Field(default=0) + weekly_streak_best: int = Field(default=0) + weekly_streak_current: int = Field(default=0) + + +class DailyChallengeStats(DailyChallengeStatsBase, table=True): + __tablename__ = "daily_challenge_stats" # pyright: ignore[reportAssignmentType] + + user_id: int | None = Field( + default=None, + sa_column=Column( + BigInteger, + ForeignKey("lazer_users.id"), + unique=True, + index=True, + primary_key=True, + ), + ) + user: "User" = Relationship(back_populates="daily_challenge_stats") + + +class DailyChallengeStatsResp(DailyChallengeStatsBase): + user_id: int + + @classmethod + def from_db( + cls, + obj: DailyChallengeStats, + ) -> "DailyChallengeStatsResp": + return cls.model_validate(obj) diff --git a/app/database/lazer_user.py b/app/database/lazer_user.py new file mode 100644 index 0000000..9b98c98 --- /dev/null +++ b/app/database/lazer_user.py @@ -0,0 +1,330 @@ +from datetime import UTC, datetime +from typing import TYPE_CHECKING, NotRequired, TypedDict + +from app.models.model import UTCBaseModel +from app.models.score import GameMode +from app.models.user import Country, Page, RankHistory + +from .achievement import UserAchievement, UserAchievementResp +from .daily_challenge import DailyChallengeStats, DailyChallengeStatsResp +from .monthly_playcounts import MonthlyPlaycounts, MonthlyPlaycountsResp +from .statistics import UserStatistics, UserStatisticsResp +from .team import Team, TeamMember +from .user_account_history import UserAccountHistory, UserAccountHistoryResp + +from sqlalchemy.orm import joinedload, selectinload +from sqlmodel import ( + JSON, + BigInteger, + Column, + DateTime, + Field, + Relationship, + SQLModel, + func, + select, +) +from sqlmodel.ext.asyncio.session import AsyncSession + +if TYPE_CHECKING: + from app.database.relationship import RelationshipResp + + +class Kudosu(TypedDict): + available: int + total: int + + +class RankHighest(TypedDict): + rank: int + updated_at: datetime + + +class UserProfileCover(TypedDict): + url: str + custom_url: NotRequired[str] + id: NotRequired[str] + + +Badge = TypedDict( + "Badge", + { + "awarded_at": datetime, + "description": str, + "image@2x_url": str, + "image_url": str, + "url": str, + }, +) + + +class UserBase(UTCBaseModel, SQLModel): + avatar_url: str = "" + country_code: str = Field(default="CN", max_length=2, index=True) + # ? default_group: str|None + is_active: bool = True + is_bot: bool = False + is_supporter: bool = False + last_visit: datetime = Field( + default=datetime.now(UTC), sa_column=Column(DateTime(timezone=True)) + ) + pm_friends_only: bool = False + profile_colour: str | None = None + username: str = Field(max_length=32, unique=True, index=True) + page: Page = Field(sa_column=Column(JSON), default=Page(html="", raw="")) + previous_usernames: list[str] = Field(default_factory=list, sa_column=Column(JSON)) + # TODO: replays_watched_counts + support_level: int = 0 + badges: list[Badge] = Field(default_factory=list, sa_column=Column(JSON)) + + # optional + is_restricted: bool = False + # blocks + cover: UserProfileCover = Field( + default=UserProfileCover( + url="https://assets.ppy.sh/user-profile-covers/default.jpeg" + ), + sa_column=Column(JSON), + ) + beatmap_playcounts_count: int = 0 + # kudosu + + # UserExtended + playmode: GameMode = GameMode.OSU + discord: str | None = None + has_supported: bool = False + interests: str | None = None + join_date: datetime = Field(default=datetime.now(UTC)) + location: str | None = None + max_blocks: int = 50 + max_friends: int = 500 + occupation: str | None = None + playstyle: list[str] = Field(default_factory=list, sa_column=Column(JSON)) + # TODO: post_count + profile_hue: int | None = None + profile_order: list[str] = Field( + default_factory=lambda: [ + "me", + "recent_activity", + "top_ranks", + "medals", + "historical", + "beatmaps", + "kudosu", + ], + sa_column=Column(JSON), + ) + title: str | None = None + title_url: str | None = None + twitter: str | None = None + website: str | None = None + + # undocumented + comments_count: int = 0 + post_count: int = 0 + is_admin: bool = False + is_gmt: bool = False + is_qat: bool = False + is_bng: bool = False + + +class User(UserBase, table=True): + __tablename__ = "lazer_users" # pyright: ignore[reportAssignmentType] + + id: int | None = Field( + default=None, + sa_column=Column(BigInteger, primary_key=True, autoincrement=True, index=True), + ) + account_history: list[UserAccountHistory] = Relationship() + statistics: list[UserStatistics] = Relationship() + achievement: list[UserAchievement] = Relationship(back_populates="user") + team_membership: TeamMember | None = Relationship(back_populates="user") + daily_challenge_stats: DailyChallengeStats | None = Relationship( + back_populates="user" + ) + monthly_playcounts: list[MonthlyPlaycounts] = Relationship(back_populates="user") + + email: str = Field(max_length=254, unique=True, index=True, exclude=True) + priv: int = Field(default=1, exclude=True) + pw_bcrypt: str = Field(max_length=60, exclude=True) + silence_end_at: datetime | None = Field( + default=None, sa_column=Column(DateTime(timezone=True)), exclude=True + ) + donor_end_at: datetime | None = Field( + default=None, sa_column=Column(DateTime(timezone=True)), exclude=True + ) + + @classmethod + def all_select_option(cls): + return ( + selectinload(cls.account_history), # pyright: ignore[reportArgumentType] + selectinload(cls.statistics), # pyright: ignore[reportArgumentType] + selectinload(cls.achievement), # pyright: ignore[reportArgumentType] + joinedload(cls.team_membership).joinedload(TeamMember.team), # pyright: ignore[reportArgumentType] + joinedload(cls.daily_challenge_stats), # pyright: ignore[reportArgumentType] + selectinload(cls.monthly_playcounts), # pyright: ignore[reportArgumentType] + ) + + +class UserResp(UserBase): + id: int | None = None + is_online: bool = True # TODO + groups: list = [] # TODO + country: Country = Field(default_factory=lambda: Country(code="CN", name="China")) + favourite_beatmapset_count: int = 0 # TODO + graveyard_beatmapset_count: int = 0 # TODO + guest_beatmapset_count: int = 0 # TODO + loved_beatmapset_count: int = 0 # TODO + mapping_follower_count: int = 0 # TODO + nominated_beatmapset_count: int = 0 # TODO + pending_beatmapset_count: int = 0 # TODO + ranked_beatmapset_count: int = 0 # TODO + follow_user_mapping: list[int] = Field(default_factory=list) + follower_count: int = 0 + friends: list["RelationshipResp"] | None = None + scores_best_count: int = 0 + scores_first_count: int = 0 + scores_recent_count: int = 0 + scores_pinned_count: int = 0 + account_history: list[UserAccountHistoryResp] = [] + active_tournament_banners: list[dict] = [] # TODO + kudosu: Kudosu = Field(default_factory=lambda: Kudosu(available=0, total=0)) # TODO + monthly_playcounts: list[MonthlyPlaycountsResp] = Field(default_factory=list) + unread_pm_count: int = 0 # TODO + rank_history: RankHistory | None = None # TODO + rank_highest: RankHighest | None = None # TODO + statistics: UserStatisticsResp | None = None + statistics_rulesets: dict[str, UserStatisticsResp] | None = None + user_achievements: list[UserAchievementResp] = Field(default_factory=list) + cover_url: str = "" # deprecated + team: Team | None = None + session_verified: bool = True + daily_challenge_user_stats: DailyChallengeStatsResp | None = None + + # TODO: monthly_playcounts, unread_pm_count, rank_history, user_preferences + + @classmethod + async def from_db( + cls, + obj: User, + session: AsyncSession, + include: list[str] = [], + ruleset: GameMode | None = None, + ) -> "UserResp": + from .best_score import BestScore + from .relationship import Relationship, RelationshipResp, RelationshipType + + u = cls.model_validate(obj.model_dump()) + u.id = obj.id + u.follower_count = ( + await session.exec( + select(func.count()) + .select_from(Relationship) + .where( + Relationship.target_id == obj.id, + Relationship.type == RelationshipType.FOLLOW, + ) + ) + ).one() + u.scores_best_count = ( + await session.exec( + select(func.count()) + .select_from(BestScore) + .where( + BestScore.user_id == obj.id, + ) + .limit(200) + ) + ).one() + u.cover_url = ( + obj.cover.get( + "url", "https://assets.ppy.sh/user-profile-covers/default.jpeg" + ) + if obj.cover + else "https://assets.ppy.sh/user-profile-covers/default.jpeg" + ) + + if "friends" in include: + u.friends = [ + await RelationshipResp.from_db(session, r) + for r in ( + await session.exec( + select(Relationship) + .options( + joinedload(Relationship.target).options( # pyright: ignore[reportArgumentType] + *User.all_select_option() + ) + ) + .where( + Relationship.user_id == obj.id, + Relationship.type == RelationshipType.FOLLOW, + ) + ) + ).all() + ] + + if "team" in include: + if obj.team_membership: + u.team = obj.team_membership.team + + if "account_history" in include: + u.account_history = [ + UserAccountHistoryResp.from_db(ah) for ah in obj.account_history + ] + + if "daily_challenge_user_stats": + if obj.daily_challenge_stats: + u.daily_challenge_user_stats = DailyChallengeStatsResp.from_db( + obj.daily_challenge_stats + ) + + if "statistics" in include: + current_stattistics = None + for i in obj.statistics: + if i.mode == (ruleset or obj.playmode): + current_stattistics = i + break + u.statistics = ( + UserStatisticsResp.from_db(current_stattistics) + if current_stattistics + else None + ) + + if "statistics_rulesets" in include: + u.statistics_rulesets = { + i.mode.value: UserStatisticsResp.from_db(i) for i in obj.statistics + } + + if "monthly_playcounts" in include: + u.monthly_playcounts = [ + MonthlyPlaycountsResp.from_db(pc) for pc in obj.monthly_playcounts + ] + + if "achievements" in include: + u.user_achievements = [ + UserAchievementResp.from_db(ua) for ua in obj.achievement + ] + + return u + + +ALL_INCLUDED = [ + "friends", + "team", + "account_history", + "daily_challenge_user_stats", + "statistics", + "statistics_rulesets", + "achievements", + "monthly_playcounts", +] + + +SEARCH_INCLUDED = [ + "team", + "daily_challenge_user_stats", + "statistics", + "statistics_rulesets", + "achievements", + "monthly_playcounts", +] diff --git a/app/database/monthly_playcounts.py b/app/database/monthly_playcounts.py new file mode 100644 index 0000000..46192d1 --- /dev/null +++ b/app/database/monthly_playcounts.py @@ -0,0 +1,43 @@ +from datetime import date +from typing import TYPE_CHECKING + +from sqlmodel import ( + BigInteger, + Column, + Field, + ForeignKey, + Relationship, + SQLModel, +) + +if TYPE_CHECKING: + from .lazer_user import User + + +class MonthlyPlaycounts(SQLModel, table=True): + __tablename__ = "monthly_playcounts" # pyright: ignore[reportAssignmentType] + + id: int | None = Field( + default=None, + sa_column=Column(BigInteger, primary_key=True, autoincrement=True), + ) + user_id: int = Field( + sa_column=Column(BigInteger, ForeignKey("lazer_users.id"), index=True) + ) + year: int = Field(index=True) + month: int = Field(index=True) + playcount: int = Field(default=0) + + user: "User" = Relationship(back_populates="monthly_playcounts") + + +class MonthlyPlaycountsResp(SQLModel): + start_date: date + count: int + + @classmethod + def from_db(cls, db_model: MonthlyPlaycounts) -> "MonthlyPlaycountsResp": + return cls( + start_date=date(db_model.year, db_model.month, 1), + count=db_model.playcount, + ) diff --git a/app/database/statistics.py b/app/database/statistics.py new file mode 100644 index 0000000..cac2971 --- /dev/null +++ b/app/database/statistics.py @@ -0,0 +1,95 @@ +from typing import TYPE_CHECKING + +from app.models.score import GameMode + +from sqlmodel import ( + BigInteger, + Column, + Field, + ForeignKey, + Relationship, + SQLModel, +) + +if TYPE_CHECKING: + from .lazer_user import User + + +class UserStatisticsBase(SQLModel): + mode: GameMode + count_100: int = Field(default=0, sa_column=Column(BigInteger)) + count_300: int = Field(default=0, sa_column=Column(BigInteger)) + count_50: int = Field(default=0, sa_column=Column(BigInteger)) + count_miss: int = Field(default=0, sa_column=Column(BigInteger)) + + global_rank: int | None = Field(default=None) + country_rank: int | None = Field(default=None) + + pp: float = Field(default=0.0) + ranked_score: int = Field(default=0) + hit_accuracy: float = Field(default=0.00) + total_score: int = Field(default=0, sa_column=Column(BigInteger)) + total_hits: int = Field(default=0, sa_column=Column(BigInteger)) + maximum_combo: int = Field(default=0) + + play_count: int = Field(default=0) + play_time: int = Field(default=0, sa_column=Column(BigInteger)) + replays_watched_by_others: int = Field(default=0) + is_ranked: bool = Field(default=True) + + +class UserStatistics(UserStatisticsBase, table=True): + __tablename__ = "lazer_user_statistics" # pyright: ignore[reportAssignmentType] + id: int | None = Field(default=None, primary_key=True) + user_id: int = Field( + default=None, + sa_column=Column( + BigInteger, + ForeignKey("lazer_users.id"), + index=True, + ), + ) + grade_ss: int = Field(default=0) + grade_ssh: int = Field(default=0) + grade_s: int = Field(default=0) + grade_sh: int = Field(default=0) + grade_a: int = Field(default=0) + + level_current: int = Field(default=1) + level_progress: int = Field(default=0) + + user: "User" = Relationship(back_populates="statistics") # type: ignore[valid-type] + + +class UserStatisticsResp(UserStatisticsBase): + grade_counts: dict[str, int] = Field( + default_factory=lambda: { + "ss": 0, + "ssh": 0, + "s": 0, + "sh": 0, + "a": 0, + } + ) + level: dict[str, int] = Field( + default_factory=lambda: { + "current": 1, + "progress": 0, + } + ) + + @classmethod + def from_db(cls, obj: UserStatistics) -> "UserStatisticsResp": + s = cls.model_validate(obj) + s.grade_counts = { + "ss": obj.grade_ss, + "ssh": obj.grade_ssh, + "s": obj.grade_s, + "sh": obj.grade_sh, + "a": obj.grade_a, + } + s.level = { + "current": obj.level_current, + "progress": obj.level_progress, + } + return s diff --git a/app/database/user_account_history.py b/app/database/user_account_history.py new file mode 100644 index 0000000..217c8eb --- /dev/null +++ b/app/database/user_account_history.py @@ -0,0 +1,45 @@ +from datetime import UTC, datetime +from enum import Enum + +from app.models.model import UTCBaseModel + +from sqlmodel import BigInteger, Column, Field, ForeignKey, Integer, SQLModel + + +class UserAccountHistoryType(str, Enum): + NOTE = "note" + RESTRICTION = "restriction" + SLIENCE = "silence" + TOURNAMENT_BAN = "tournament_ban" + + +class UserAccountHistoryBase(SQLModel, UTCBaseModel): + description: str | None = None + length: int + permanent: bool = False + timestamp: datetime = Field(default=datetime.now(UTC)) + type: UserAccountHistoryType + + +class UserAccountHistory(UserAccountHistoryBase, table=True): + __tablename__ = "user_account_history" # pyright: ignore[reportAssignmentType] + + id: int | None = Field( + sa_column=Column( + Integer, + autoincrement=True, + index=True, + primary_key=True, + ) + ) + user_id: int = Field( + sa_column=Column(BigInteger, ForeignKey("lazer_users.id"), index=True) + ) + + +class UserAccountHistoryResp(UserAccountHistoryBase): + id: int | None = None + + @classmethod + def from_db(cls, db_model: UserAccountHistory) -> "UserAccountHistoryResp": + return cls.model_validate(db_model) diff --git a/app/models/model.py b/app/models/model.py new file mode 100644 index 0000000..bc00585 --- /dev/null +++ b/app/models/model.py @@ -0,0 +1,15 @@ +from __future__ import annotations + +from datetime import UTC, datetime + +from pydantic import BaseModel, field_serializer + + +class UTCBaseModel(BaseModel): + @field_serializer("*", when_used="json") + def serialize_datetime(self, v, _info): + if isinstance(v, datetime): + if v.tzinfo is None: + v = v.replace(tzinfo=UTC) + return v.astimezone(UTC).isoformat() + return v