chore(merge): merge branch 'main' into feat/multiplayer-api

This commit is contained in:
MingxuanGame
2025-08-01 05:24:12 +00:00
39 changed files with 971 additions and 2191 deletions

View File

@@ -1,3 +1,4 @@
from .achievement import UserAchievement, UserAchievementResp
from .auth import OAuthToken
from .beatmap import (
Beatmap as Beatmap,
@@ -8,7 +9,13 @@ from .beatmapset import (
BeatmapsetResp as BeatmapsetResp,
)
from .best_score import BestScore
from .legacy import LegacyOAuthToken, LegacyUserStatistics
from .daily_challenge import DailyChallengeStats, DailyChallengeStatsResp
from .favourite_beatmapset import FavouriteBeatmapset
from .lazer_user import (
User,
UserResp,
)
from .pp_best_score import PPBestScore
from .relationship import Relationship, RelationshipResp, RelationshipType
from .score import (
Score,
@@ -17,52 +24,27 @@ from .score import (
ScoreStatistics,
)
from .score_token import ScoreToken, ScoreTokenResp
from .statistics import (
UserStatistics,
UserStatisticsResp,
)
from .team import Team, TeamMember
from .user import (
DailyChallengeStats,
LazerUserAchievement,
LazerUserBadge,
LazerUserBanners,
LazerUserCountry,
LazerUserCounts,
LazerUserKudosu,
LazerUserMonthlyPlaycounts,
LazerUserPreviousUsername,
LazerUserProfile,
LazerUserProfileSections,
LazerUserReplaysWatched,
LazerUserStatistics,
RankHistory,
User,
UserAchievement,
UserAvatar,
from .user_account_history import (
UserAccountHistory,
UserAccountHistoryResp,
UserAccountHistoryType,
)
BeatmapsetResp.model_rebuild()
BeatmapResp.model_rebuild()
__all__ = [
"Beatmap",
"BeatmapResp",
"Beatmapset",
"BeatmapsetResp",
"BestScore",
"DailyChallengeStats",
"LazerUserAchievement",
"LazerUserBadge",
"LazerUserBanners",
"LazerUserCountry",
"LazerUserCounts",
"LazerUserKudosu",
"LazerUserMonthlyPlaycounts",
"LazerUserPreviousUsername",
"LazerUserProfile",
"LazerUserProfileSections",
"LazerUserReplaysWatched",
"LazerUserStatistics",
"LegacyOAuthToken",
"LegacyUserStatistics",
"DailyChallengeStatsResp",
"FavouriteBeatmapset",
"OAuthToken",
"RankHistory",
"PPBestScore",
"Relationship",
"RelationshipResp",
"RelationshipType",
@@ -75,6 +57,17 @@ __all__ = [
"Team",
"TeamMember",
"User",
"UserAccountHistory",
"UserAccountHistoryResp",
"UserAccountHistoryType",
"UserAchievement",
"UserAvatar",
"UserAchievement",
"UserAchievementResp",
"UserResp",
"UserStatistics",
"UserStatisticsResp",
]
for i in __all__:
if i.endswith("Resp"):
globals()[i].model_rebuild() # type: ignore[call-arg]

View File

@@ -1,19 +1,21 @@
from datetime import datetime
from typing import TYPE_CHECKING
from app.models.model import UTCBaseModel
from sqlalchemy import Column, DateTime
from sqlmodel import BigInteger, Field, ForeignKey, Relationship, SQLModel
if TYPE_CHECKING:
from .user import User
from .lazer_user import User
class OAuthToken(SQLModel, table=True):
class OAuthToken(UTCBaseModel, SQLModel, table=True):
__tablename__ = "oauth_tokens" # pyright: ignore[reportAssignmentType]
id: int | None = Field(default=None, primary_key=True, index=True)
user_id: int = Field(
sa_column=Column(BigInteger, ForeignKey("users.id"), index=True)
sa_column=Column(BigInteger, ForeignKey("lazer_users.id"), index=True)
)
access_token: str = Field(max_length=500, unique=True)
refresh_token: str = Field(max_length=500, unique=True)

View File

@@ -7,13 +7,14 @@ from app.models.score import MODE_TO_INT, GameMode
from .beatmapset import Beatmapset, BeatmapsetResp
from sqlalchemy import DECIMAL, Column, DateTime
from sqlalchemy.orm import joinedload
from sqlmodel import VARCHAR, Field, Relationship, SQLModel, select
from sqlmodel.ext.asyncio.session import AsyncSession
if TYPE_CHECKING:
from app.fetcher import Fetcher
from .lazer_user import User
class BeatmapOwner(SQLModel):
id: int
@@ -66,7 +67,9 @@ class Beatmap(BeatmapBase, table=True):
beatmapset_id: int = Field(foreign_key="beatmapsets.id", index=True)
beatmap_status: BeatmapRankStatus
# optional
beatmapset: Beatmapset = Relationship(back_populates="beatmaps")
beatmapset: Beatmapset = Relationship(
back_populates="beatmaps", sa_relationship_kwargs={"lazy": "joined"}
)
@property
def can_ranked(self) -> bool:
@@ -87,13 +90,7 @@ class Beatmap(BeatmapBase, table=True):
session.add(beatmap)
await session.commit()
beatmap = (
await session.exec(
select(Beatmap)
.options(
joinedload(Beatmap.beatmapset).selectinload(Beatmapset.beatmaps) # pyright: ignore[reportArgumentType]
)
.where(Beatmap.id == resp.id)
)
await session.exec(select(Beatmap).where(Beatmap.id == resp.id))
).first()
assert beatmap is not None, "Beatmap should not be None after commit"
return beatmap
@@ -131,13 +128,9 @@ class Beatmap(BeatmapBase, table=True):
) -> "Beatmap":
beatmap = (
await session.exec(
select(Beatmap)
.where(
select(Beatmap).where(
Beatmap.id == bid if bid is not None else Beatmap.checksum == md5
)
.options(
joinedload(Beatmap.beatmapset).selectinload(Beatmapset.beatmaps) # pyright: ignore[reportArgumentType]
)
)
).first()
if not beatmap:
@@ -164,11 +157,13 @@ class BeatmapResp(BeatmapBase):
url: str = ""
@classmethod
def from_db(
async def from_db(
cls,
beatmap: Beatmap,
query_mode: GameMode | None = None,
from_set: bool = False,
session: AsyncSession | None = None,
user: "User | None" = None,
) -> "BeatmapResp":
beatmap_ = beatmap.model_dump()
if query_mode is not None and beatmap.mode != query_mode:
@@ -178,5 +173,7 @@ class BeatmapResp(BeatmapBase):
beatmap_["ranked"] = beatmap.beatmap_status.value
beatmap_["mode_int"] = MODE_TO_INT[beatmap.mode]
if not from_set:
beatmap_["beatmapset"] = BeatmapsetResp.from_db(beatmap.beatmapset)
beatmap_["beatmapset"] = await BeatmapsetResp.from_db(
beatmap.beatmapset, session=session, user=user
)
return cls.model_validate(beatmap_)

View File

@@ -4,13 +4,17 @@ from typing import TYPE_CHECKING, TypedDict, cast
from app.models.beatmap import BeatmapRankStatus, Genre, Language
from app.models.score import GameMode
from .lazer_user import BASE_INCLUDES, User, UserResp
from pydantic import BaseModel, model_serializer
from sqlalchemy import DECIMAL, JSON, Column, DateTime, Text
from sqlmodel import Field, Relationship, SQLModel
from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlmodel import Field, Relationship, SQLModel, col, func, select
from sqlmodel.ext.asyncio.session import AsyncSession
if TYPE_CHECKING:
from .beatmap import Beatmap, BeatmapResp
from .favourite_beatmapset import FavouriteBeatmapset
class BeatmapCovers(SQLModel):
@@ -88,7 +92,6 @@ class BeatmapsetBase(SQLModel):
artist_unicode: str = Field(index=True)
covers: BeatmapCovers | None = Field(sa_column=Column(JSON))
creator: str
favourite_count: int
nsfw: bool = Field(default=False)
play_count: int
preview_url: str
@@ -112,11 +115,9 @@ class BeatmapsetBase(SQLModel):
pack_tags: list[str] = Field(default=[], sa_column=Column(JSON))
ratings: list[int] = Field(default=None, sa_column=Column(JSON))
# TODO: recent_favourites: Optional[list[User]] = None
# TODO: related_users: Optional[list[User]] = None
# TODO: user: Optional[User] = Field(default=None)
track_id: int | None = Field(default=None) # feature artist?
# TODO: has_favourited
# BeatmapsetExtended
bpm: float = Field(default=0.0, sa_column=Column(DECIMAL(10, 2)))
@@ -129,7 +130,7 @@ class BeatmapsetBase(SQLModel):
tags: str = Field(default="", sa_column=Column(Text))
class Beatmapset(BeatmapsetBase, table=True):
class Beatmapset(AsyncAttrs, BeatmapsetBase, table=True):
__tablename__ = "beatmapsets" # pyright: ignore[reportAssignmentType]
id: int | None = Field(default=None, primary_key=True, index=True)
@@ -150,6 +151,7 @@ class Beatmapset(BeatmapsetBase, table=True):
hype_required: int = Field(default=0)
availability_info: str | None = Field(default=None)
download_disabled: bool = Field(default=False)
favourites: list["FavouriteBeatmapset"] = Relationship(back_populates="beatmapset")
@classmethod
async def from_resp(
@@ -197,40 +199,88 @@ class BeatmapsetResp(BeatmapsetBase):
genre: BeatmapTranslationText | None = None
language: BeatmapTranslationText | None = None
nominations: BeatmapNominations | None = None
has_favourited: bool = False
favourite_count: int = 0
recent_favourites: list[UserResp] = Field(default_factory=list)
@classmethod
def from_db(cls, beatmapset: Beatmapset) -> "BeatmapsetResp":
async def from_db(
cls,
beatmapset: Beatmapset,
include: list[str] = [],
session: AsyncSession | None = None,
user: User | None = None,
) -> "BeatmapsetResp":
from .beatmap import BeatmapResp
from .favourite_beatmapset import FavouriteBeatmapset
beatmaps = [
BeatmapResp.from_db(beatmap, from_set=True)
for beatmap in beatmapset.beatmaps
]
update = {
"beatmaps": [
await BeatmapResp.from_db(beatmap, from_set=True)
for beatmap in await beatmapset.awaitable_attrs.beatmaps
],
"hype": BeatmapHype(
current=beatmapset.hype_current, required=beatmapset.hype_required
),
"availability": BeatmapAvailability(
more_information=beatmapset.availability_info,
download_disabled=beatmapset.download_disabled,
),
"genre": BeatmapTranslationText(
name=beatmapset.beatmap_genre.name,
id=beatmapset.beatmap_genre.value,
),
"language": BeatmapTranslationText(
name=beatmapset.beatmap_language.name,
id=beatmapset.beatmap_language.value,
),
"nominations": BeatmapNominations(
required=beatmapset.nominations_required,
current=beatmapset.nominations_current,
),
"status": beatmapset.beatmap_status.name.lower(),
"ranked": beatmapset.beatmap_status.value,
"is_scoreable": beatmapset.beatmap_status > BeatmapRankStatus.PENDING,
**beatmapset.model_dump(),
}
if session and user:
existing_favourite = (
await session.exec(
select(FavouriteBeatmapset).where(
FavouriteBeatmapset.beatmapset_id == beatmapset.id
)
)
).first()
update["has_favourited"] = existing_favourite is not None
if session and "recent_favourites" in include:
recent_favourites = (
await session.exec(
select(FavouriteBeatmapset)
.where(
FavouriteBeatmapset.beatmapset_id == beatmapset.id,
)
.order_by(col(FavouriteBeatmapset.date).desc())
.limit(50)
)
).all()
update["recent_favourites"] = [
await UserResp.from_db(
await favourite.awaitable_attrs.user,
session=session,
include=BASE_INCLUDES,
)
for favourite in recent_favourites
]
if session:
update["favourite_count"] = (
await session.exec(
select(func.count())
.select_from(FavouriteBeatmapset)
.where(FavouriteBeatmapset.beatmapset_id == beatmapset.id)
)
).one()
return cls.model_validate(
{
"beatmaps": beatmaps,
"hype": BeatmapHype(
current=beatmapset.hype_current, required=beatmapset.hype_required
),
"availability": BeatmapAvailability(
more_information=beatmapset.availability_info,
download_disabled=beatmapset.download_disabled,
),
"genre": BeatmapTranslationText(
name=beatmapset.beatmap_genre.name,
id=beatmapset.beatmap_genre.value,
),
"language": BeatmapTranslationText(
name=beatmapset.beatmap_language.name,
id=beatmapset.beatmap_language.value,
),
"nominations": BeatmapNominations(
required=beatmapset.nominations_required,
current=beatmapset.nominations_current,
),
"status": beatmapset.beatmap_status.name.lower(),
"ranked": beatmapset.beatmap_status.value,
"is_scoreable": beatmapset.beatmap_status > BeatmapRankStatus.PENDING,
**beatmapset.model_dump(),
}
update,
)

View File

@@ -1,14 +1,14 @@
from typing import TYPE_CHECKING
from app.models.score import GameMode
from app.models.score import GameMode, Rank
from .user import User
from .lazer_user import User
from sqlmodel import (
JSON,
BigInteger,
Column,
Field,
Float,
ForeignKey,
Relationship,
SQLModel,
@@ -20,22 +20,29 @@ if TYPE_CHECKING:
class BestScore(SQLModel, table=True):
__tablename__ = "best_scores" # pyright: ignore[reportAssignmentType]
__tablename__ = "total_score_best_scores" # pyright: ignore[reportAssignmentType]
user_id: int = Field(
sa_column=Column(BigInteger, ForeignKey("users.id"), index=True)
sa_column=Column(BigInteger, ForeignKey("lazer_users.id"), index=True)
)
score_id: int = Field(
sa_column=Column(BigInteger, ForeignKey("scores.id"), primary_key=True)
)
beatmap_id: int = Field(foreign_key="beatmaps.id", index=True)
gamemode: GameMode = Field(index=True)
pp: float = Field(
sa_column=Column(Float, default=0),
total_score: int = Field(
default=0, sa_column=Column(BigInteger, ForeignKey("scores.total_score"))
)
acc: float = Field(
sa_column=Column(Float, default=0),
mods: list[str] = Field(
default_factory=list,
sa_column=Column(JSON),
)
rank: Rank
user: User = Relationship()
score: "Score" = Relationship()
score: "Score" = Relationship(
sa_relationship_kwargs={
"foreign_keys": "[BestScore.score_id]",
"lazy": "joined",
}
)
beatmap: "Beatmap" = Relationship()

View File

@@ -0,0 +1,53 @@
import datetime
from app.database.beatmapset import Beatmapset
from app.database.lazer_user import User
from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlmodel import (
BigInteger,
Column,
DateTime,
Field,
ForeignKey,
Relationship,
SQLModel,
)
class FavouriteBeatmapset(AsyncAttrs, SQLModel, table=True):
__tablename__ = "favourite_beatmapset" # pyright: ignore[reportAssignmentType]
id: int | None = Field(
default=None,
sa_column=Column(BigInteger, autoincrement=True, primary_key=True),
exclude=True,
)
user_id: int = Field(
default=None,
sa_column=Column(
BigInteger,
ForeignKey("lazer_users.id"),
index=True,
),
)
beatmapset_id: int = Field(
default=None,
sa_column=Column(
ForeignKey("beatmapsets.id"),
index=True,
),
)
date: datetime.datetime = Field(
default=datetime.datetime.now(datetime.UTC),
sa_column=Column(
DateTime,
),
)
user: User = Relationship(back_populates="favourite_beatmapsets")
beatmapset: Beatmapset = Relationship(
sa_relationship_kwargs={
"lazy": "selectin",
},
back_populates="favourites",
)

View File

@@ -12,7 +12,7 @@ from .statistics import UserStatistics, UserStatisticsResp
from .team import Team, TeamMember
from .user_account_history import UserAccountHistory, UserAccountHistoryResp
from sqlalchemy.orm import joinedload, selectinload
from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlmodel import (
JSON,
BigInteger,
@@ -27,7 +27,8 @@ from sqlmodel import (
from sqlmodel.ext.asyncio.session import AsyncSession
if TYPE_CHECKING:
from app.database.relationship import RelationshipResp
from .favourite_beatmapset import FavouriteBeatmapset
from .relationship import RelationshipResp
class Kudosu(TypedDict):
@@ -128,7 +129,7 @@ class UserBase(UTCBaseModel, SQLModel):
is_bng: bool = False
class User(UserBase, table=True):
class User(AsyncAttrs, UserBase, table=True):
__tablename__ = "lazer_users" # pyright: ignore[reportAssignmentType]
id: int | None = Field(
@@ -143,6 +144,9 @@ class User(UserBase, table=True):
back_populates="user"
)
monthly_playcounts: list[MonthlyPlaycounts] = Relationship(back_populates="user")
favourite_beatmapsets: list["FavouriteBeatmapset"] = Relationship(
back_populates="user"
)
email: str = Field(max_length=254, unique=True, index=True, exclude=True)
priv: int = Field(default=1, exclude=True)
@@ -154,21 +158,10 @@ class User(UserBase, table=True):
default=None, sa_column=Column(DateTime(timezone=True)), exclude=True
)
@classmethod
def all_select_option(cls):
return (
selectinload(cls.account_history), # pyright: ignore[reportArgumentType]
selectinload(cls.statistics), # pyright: ignore[reportArgumentType]
selectinload(cls.achievement), # pyright: ignore[reportArgumentType]
joinedload(cls.team_membership).joinedload(TeamMember.team), # pyright: ignore[reportArgumentType]
joinedload(cls.daily_challenge_stats), # pyright: ignore[reportArgumentType]
selectinload(cls.monthly_playcounts), # pyright: ignore[reportArgumentType]
)
class UserResp(UserBase):
id: int | None = None
is_online: bool = True # TODO
is_online: bool = False
groups: list = [] # TODO
country: Country = Field(default_factory=lambda: Country(code="CN", name="China"))
favourite_beatmapset_count: int = 0 # TODO
@@ -211,6 +204,8 @@ class UserResp(UserBase):
include: list[str] = [],
ruleset: GameMode | None = None,
) -> "UserResp":
from app.dependencies.database import get_redis
from .best_score import BestScore
from .relationship import Relationship, RelationshipResp, RelationshipType
@@ -236,6 +231,8 @@ class UserResp(UserBase):
.limit(200)
)
).one()
redis = get_redis()
u.is_online = await redis.exists(f"metadata:online:{obj.id}")
u.cover_url = (
obj.cover.get(
"url", "https://assets.ppy.sh/user-profile-covers/default.jpeg"
@@ -249,13 +246,7 @@ class UserResp(UserBase):
await RelationshipResp.from_db(session, r)
for r in (
await session.exec(
select(Relationship)
.options(
joinedload(Relationship.target).options( # pyright: ignore[reportArgumentType]
*User.all_select_option()
)
)
.where(
select(Relationship).where(
Relationship.user_id == obj.id,
Relationship.type == RelationshipType.FOLLOW,
)
@@ -264,23 +255,26 @@ class UserResp(UserBase):
]
if "team" in include:
if obj.team_membership:
if await obj.awaitable_attrs.team_membership:
assert obj.team_membership
u.team = obj.team_membership.team
if "account_history" in include:
u.account_history = [
UserAccountHistoryResp.from_db(ah) for ah in obj.account_history
UserAccountHistoryResp.from_db(ah)
for ah in await obj.awaitable_attrs.account_history
]
if "daily_challenge_user_stats":
if obj.daily_challenge_stats:
if await obj.awaitable_attrs.daily_challenge_stats:
assert obj.daily_challenge_stats
u.daily_challenge_user_stats = DailyChallengeStatsResp.from_db(
obj.daily_challenge_stats
)
if "statistics" in include:
current_stattistics = None
for i in obj.statistics:
for i in await obj.awaitable_attrs.statistics:
if i.mode == (ruleset or obj.playmode):
current_stattistics = i
break
@@ -292,17 +286,20 @@ class UserResp(UserBase):
if "statistics_rulesets" in include:
u.statistics_rulesets = {
i.mode.value: UserStatisticsResp.from_db(i) for i in obj.statistics
i.mode.value: UserStatisticsResp.from_db(i)
for i in await obj.awaitable_attrs.statistics
}
if "monthly_playcounts" in include:
u.monthly_playcounts = [
MonthlyPlaycountsResp.from_db(pc) for pc in obj.monthly_playcounts
MonthlyPlaycountsResp.from_db(pc)
for pc in await obj.awaitable_attrs.monthly_playcounts
]
if "achievements" in include:
u.user_achievements = [
UserAchievementResp.from_db(ua) for ua in obj.achievement
UserAchievementResp.from_db(ua)
for ua in await obj.awaitable_attrs.achievement
]
return u
@@ -328,3 +325,9 @@ SEARCH_INCLUDED = [
"achievements",
"monthly_playcounts",
]
BASE_INCLUDES = [
"team",
"daily_challenge_user_stats",
"statistics",
]

View File

@@ -1,94 +0,0 @@
from datetime import datetime
from typing import TYPE_CHECKING
from sqlalchemy import JSON, Column, DateTime
from sqlalchemy.orm import Mapped
from sqlmodel import BigInteger, Field, ForeignKey, Relationship, SQLModel
if TYPE_CHECKING:
from .user import User
# ============================================
# 旧的兼容性表模型(保留以便向后兼容)
# ============================================
class LegacyUserStatistics(SQLModel, table=True):
__tablename__ = "user_statistics" # pyright: ignore[reportAssignmentType]
id: int | None = Field(default=None, primary_key=True, index=True)
user_id: int = Field(sa_column=Column(BigInteger, ForeignKey("users.id")))
mode: str = Field(max_length=10) # osu, taiko, fruits, mania
# 基本统计
count_100: int = Field(default=0)
count_300: int = Field(default=0)
count_50: int = Field(default=0)
count_miss: int = Field(default=0)
# 等级信息
level_current: int = Field(default=1)
level_progress: int = Field(default=0)
# 排名信息
global_rank: int | None = Field(default=None)
global_rank_exp: int | None = Field(default=None)
country_rank: int | None = Field(default=None)
# PP 和分数
pp: float = Field(default=0.0)
pp_exp: float = Field(default=0.0)
ranked_score: int = Field(default=0)
hit_accuracy: float = Field(default=0.0)
total_score: int = Field(default=0)
total_hits: int = Field(default=0)
maximum_combo: int = Field(default=0)
# 游戏统计
play_count: int = Field(default=0)
play_time: int = Field(default=0)
replays_watched_by_others: int = Field(default=0)
is_ranked: bool = Field(default=False)
# 成绩等级计数
grade_ss: int = Field(default=0)
grade_ssh: int = Field(default=0)
grade_s: int = Field(default=0)
grade_sh: int = Field(default=0)
grade_a: int = Field(default=0)
# 最高排名记录
rank_highest: int | None = Field(default=None)
rank_highest_updated_at: datetime | None = Field(
default=None, sa_column=Column(DateTime)
)
created_at: datetime = Field(
default_factory=datetime.utcnow, sa_column=Column(DateTime)
)
updated_at: datetime = Field(
default_factory=datetime.utcnow, sa_column=Column(DateTime)
)
# 关联关系
user: Mapped["User"] = Relationship(back_populates="statistics")
class LegacyOAuthToken(SQLModel, table=True):
__tablename__ = "legacy_oauth_tokens" # pyright: ignore[reportAssignmentType]
id: int | None = Field(default=None, primary_key=True)
user_id: int = Field(sa_column=Column(BigInteger, ForeignKey("users.id")))
access_token: str = Field(max_length=255, index=True)
refresh_token: str = Field(max_length=255, index=True)
expires_at: datetime = Field(sa_column=Column(DateTime))
created_at: datetime = Field(
default_factory=datetime.utcnow, sa_column=Column(DateTime)
)
updated_at: datetime = Field(
default_factory=datetime.utcnow, sa_column=Column(DateTime)
)
previous_usernames: list = Field(default_factory=list, sa_column=Column(JSON))
replays_watched_counts: list = Field(default_factory=list, sa_column=Column(JSON))
# 用户关系
user: "User" = Relationship()

View File

@@ -0,0 +1,41 @@
from typing import TYPE_CHECKING
from app.models.score import GameMode
from .lazer_user import User
from sqlmodel import (
BigInteger,
Column,
Field,
Float,
ForeignKey,
Relationship,
SQLModel,
)
if TYPE_CHECKING:
from .beatmap import Beatmap
from .score import Score
class PPBestScore(SQLModel, table=True):
__tablename__ = "best_scores" # pyright: ignore[reportAssignmentType]
user_id: int = Field(
sa_column=Column(BigInteger, ForeignKey("lazer_users.id"), index=True)
)
score_id: int = Field(
sa_column=Column(BigInteger, ForeignKey("scores.id"), primary_key=True)
)
beatmap_id: int = Field(foreign_key="beatmaps.id", index=True)
gamemode: GameMode = Field(index=True)
pp: float = Field(
sa_column=Column(Float, default=0),
)
acc: float = Field(
sa_column=Column(Float, default=0),
)
user: User = Relationship()
score: "Score" = Relationship()
beatmap: "Beatmap" = Relationship()

View File

@@ -1,8 +1,6 @@
from enum import Enum
from app.models.user import User as APIUser
from .user import User as DBUser
from .lazer_user import User, UserResp
from pydantic import BaseModel
from sqlmodel import (
@@ -24,12 +22,16 @@ class RelationshipType(str, Enum):
class Relationship(SQLModel, table=True):
__tablename__ = "relationship" # pyright: ignore[reportAssignmentType]
id: int | None = Field(
default=None,
sa_column=Column(BigInteger, autoincrement=True, primary_key=True),
exclude=True,
)
user_id: int = Field(
default=None,
sa_column=Column(
BigInteger,
ForeignKey("users.id"),
primary_key=True,
ForeignKey("lazer_users.id"),
index=True,
),
)
@@ -37,20 +39,22 @@ class Relationship(SQLModel, table=True):
default=None,
sa_column=Column(
BigInteger,
ForeignKey("users.id"),
primary_key=True,
ForeignKey("lazer_users.id"),
index=True,
),
)
type: RelationshipType = Field(default=RelationshipType.FOLLOW, nullable=False)
target: DBUser = SQLRelationship(
sa_relationship_kwargs={"foreign_keys": "[Relationship.target_id]"}
target: User = SQLRelationship(
sa_relationship_kwargs={
"foreign_keys": "[Relationship.target_id]",
"lazy": "selectin",
}
)
class RelationshipResp(BaseModel):
target_id: int
target: APIUser
target: UserResp
mutual: bool = False
type: RelationshipType
@@ -58,8 +62,6 @@ class RelationshipResp(BaseModel):
async def from_db(
cls, session: AsyncSession, relationship: Relationship
) -> "RelationshipResp":
from app.utils import convert_db_user_to_api_user
target_relationship = (
await session.exec(
select(Relationship).where(
@@ -75,7 +77,16 @@ class RelationshipResp(BaseModel):
)
return cls(
target_id=relationship.target_id,
target=await convert_db_user_to_api_user(relationship.target),
target=await UserResp.from_db(
relationship.target,
session,
include=[
"team",
"daily_challenge_user_stats",
"statistics",
"statistics_rulesets",
],
),
mutual=mutual,
type=relationship.type,
)

View File

@@ -1,6 +1,7 @@
import asyncio
from collections.abc import Sequence
from datetime import UTC, datetime
from datetime import UTC, date, datetime
import json
import math
from typing import TYPE_CHECKING
@@ -12,9 +13,8 @@ from app.calculator import (
calculate_weighted_pp,
clamp,
)
from app.database.score_token import ScoreToken
from app.database.user import LazerUserStatistics, User
from app.models.beatmap import BeatmapRankStatus
from app.database.team import TeamMember
from app.models.model import UTCBaseModel
from app.models.mods import APIMod, mods_can_get_pp
from app.models.score import (
INT_TO_MODE,
@@ -26,15 +26,24 @@ from app.models.score import (
ScoreStatistics,
SoloScoreSubmissionInfo,
)
from app.models.user import User as APIUser
from .beatmap import Beatmap, BeatmapResp
from .beatmapset import Beatmapset, BeatmapsetResp
from .beatmapset import BeatmapsetResp
from .best_score import BestScore
from .lazer_user import User, UserResp
from .monthly_playcounts import MonthlyPlaycounts
from .pp_best_score import PPBestScore
from .relationship import (
Relationship as DBRelationship,
RelationshipType,
)
from .score_token import ScoreToken
from redis import Redis
from redis.asyncio import Redis
from sqlalchemy import Column, ColumnExpressionArgument, DateTime
from sqlalchemy.orm import aliased, joinedload
from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlalchemy.orm import aliased
from sqlalchemy.sql.elements import ColumnElement
from sqlmodel import (
JSON,
BigInteger,
@@ -43,9 +52,10 @@ from sqlmodel import (
Relationship,
SQLModel,
col,
false,
func,
select,
text,
true,
)
from sqlmodel.ext.asyncio.session import AsyncSession
from sqlmodel.sql._expression_select_cls import SelectOfScalar
@@ -54,7 +64,7 @@ if TYPE_CHECKING:
from app.fetcher import Fetcher
class ScoreBase(SQLModel):
class ScoreBase(AsyncAttrs, SQLModel, UTCBaseModel):
# 基本字段
accuracy: float
map_md5: str = Field(max_length=32, index=True)
@@ -94,7 +104,7 @@ class Score(ScoreBase, table=True):
default=None,
sa_column=Column(
BigInteger,
ForeignKey("users.id"),
ForeignKey("lazer_users.id"),
index=True,
),
)
@@ -112,28 +122,13 @@ class Score(ScoreBase, table=True):
gamemode: GameMode = Field(index=True)
# optional
beatmap: "Beatmap" = Relationship()
user: "User" = Relationship()
beatmap: Beatmap = Relationship()
user: User = Relationship(sa_relationship_kwargs={"lazy": "joined"})
@property
def is_perfect_combo(self) -> bool:
return self.max_combo == self.beatmap.max_combo
@staticmethod
def select_clause(with_user: bool = True) -> SelectOfScalar["Score"]:
clause = select(Score).options(
joinedload(Score.beatmap) # pyright: ignore[reportArgumentType]
.joinedload(Beatmap.beatmapset) # pyright: ignore[reportArgumentType]
.selectinload(
Beatmapset.beatmaps # pyright: ignore[reportArgumentType]
),
)
if with_user:
return clause.options(
joinedload(Score.user).options(*User.all_select_option()) # pyright: ignore[reportArgumentType]
)
return clause
@staticmethod
def select_clause_unique(
*where_clauses: ColumnExpressionArgument[bool] | bool,
@@ -147,18 +142,7 @@ class Score(ScoreBase, table=True):
)
subq = select(Score, rownum).where(*where_clauses).subquery()
best = aliased(Score, subq, adapt_on_names=True)
return (
select(best)
.where(subq.c.rn == 1)
.options(
joinedload(best.beatmap) # pyright: ignore[reportArgumentType]
.joinedload(Beatmap.beatmapset) # pyright: ignore[reportArgumentType]
.selectinload(
Beatmapset.beatmaps # pyright: ignore[reportArgumentType]
),
joinedload(best.user).options(*User.all_select_option()), # pyright: ignore[reportArgumentType]
)
)
return select(best).where(subq.c.rn == 1)
class ScoreResp(ScoreBase):
@@ -173,22 +157,21 @@ class ScoreResp(ScoreBase):
ruleset_id: int | None = None
beatmap: BeatmapResp | None = None
beatmapset: BeatmapsetResp | None = None
user: APIUser | None = None
user: UserResp | None = None
statistics: ScoreStatistics | None = None
maximum_statistics: ScoreStatistics | None = None
rank_global: int | None = None
rank_country: int | None = None
@classmethod
async def from_db(
cls, session: AsyncSession, score: Score, user: User | None = None
) -> "ScoreResp":
from app.utils import convert_db_user_to_api_user
async def from_db(cls, session: AsyncSession, score: Score) -> "ScoreResp":
s = cls.model_validate(score.model_dump())
assert score.id
s.beatmap = BeatmapResp.from_db(score.beatmap)
s.beatmapset = BeatmapsetResp.from_db(score.beatmap.beatmapset)
await score.awaitable_attrs.beatmap
s.beatmap = await BeatmapResp.from_db(score.beatmap)
s.beatmapset = await BeatmapsetResp.from_db(
score.beatmap.beatmapset, session=session, user=score.user
)
s.is_perfect_combo = s.max_combo == s.beatmap.max_combo
s.legacy_perfect = s.max_combo == s.beatmap.max_combo
s.ruleset_id = MODE_TO_INT[score.gamemode]
@@ -220,25 +203,30 @@ class ScoreResp(ScoreBase):
s.maximum_statistics = {
HitResult.GREAT: score.beatmap.max_combo,
}
if user:
s.user = await convert_db_user_to_api_user(user)
s.user = await UserResp.from_db(
score.user,
session,
include=["statistics", "team", "daily_challenge_user_stats"],
ruleset=score.gamemode,
)
s.rank_global = (
await get_score_position_by_id(
session,
score.map_md5,
score.beatmap_id,
score.id,
mode=score.gamemode,
user=user or score.user,
user=score.user,
)
or None
)
s.rank_country = (
await get_score_position_by_id(
session,
score.map_md5,
score.beatmap_id,
score.id,
score.gamemode,
user or score.user,
score.user,
type=LeaderboardType.COUNTRY,
)
or None
)
@@ -248,135 +236,137 @@ class ScoreResp(ScoreBase):
async def get_best_id(session: AsyncSession, score_id: int) -> None:
rownum = (
func.row_number()
.over(partition_by=col(BestScore.user_id), order_by=col(BestScore.pp).desc())
.over(
partition_by=col(PPBestScore.user_id), order_by=col(PPBestScore.pp).desc()
)
.label("rn")
)
subq = select(BestScore, rownum).subquery()
subq = select(PPBestScore, rownum).subquery()
stmt = select(subq.c.rn).where(subq.c.score_id == score_id)
result = await session.exec(stmt)
return result.one_or_none()
async def _score_where(
type: LeaderboardType,
beatmap: int,
mode: GameMode,
mods: list[str] | None = None,
user: User | None = None,
) -> list[ColumnElement[bool]] | None:
wheres = [
col(BestScore.beatmap_id) == beatmap,
col(BestScore.gamemode) == mode,
]
if type == LeaderboardType.FRIENDS:
if user and user.is_supporter:
subq = (
select(DBRelationship.target_id)
.where(
DBRelationship.type == RelationshipType.FOLLOW,
DBRelationship.user_id == user.id,
)
.subquery()
)
wheres.append(col(BestScore.user_id).in_(select(subq.c.target_id)))
else:
return None
elif type == LeaderboardType.COUNTRY:
if user and user.is_supporter:
wheres.append(
col(BestScore.user).has(col(User.country_code) == user.country_code)
)
else:
return None
elif type == LeaderboardType.TEAM:
if user:
team_membership = await user.awaitable_attrs.team_membership
if team_membership:
team_id = team_membership.team_id
wheres.append(
col(BestScore.user).has(
col(User.team_membership).has(TeamMember.team_id == team_id)
)
)
if mods:
if user and user.is_supporter:
wheres.append(
text(
"JSON_CONTAINS(total_score_best_scores.mods, :w)"
" AND JSON_CONTAINS(:w, total_score_best_scores.mods)"
) # pyright: ignore[reportArgumentType]
)
else:
return None
return wheres
async def get_leaderboard(
session: AsyncSession,
beatmap_md5: str,
beatmap: int,
mode: GameMode,
type: LeaderboardType = LeaderboardType.GLOBAL,
mods: list[APIMod] | None = None,
mods: list[str] | None = None,
user: User | None = None,
limit: int = 50,
) -> list[Score]:
scores = []
if type == LeaderboardType.GLOBAL:
query = (
select(Score)
.where(
col(Beatmap.beatmap_status).in_(
[
BeatmapRankStatus.RANKED,
BeatmapRankStatus.LOVED,
BeatmapRankStatus.QUALIFIED,
BeatmapRankStatus.APPROVED,
]
),
Score.map_md5 == beatmap_md5,
Score.gamemode == mode,
col(Score.passed).is_(True),
Score.mods == mods if user and user.is_supporter else false(),
)
.limit(limit)
.order_by(
col(Score.total_score).desc(),
)
)
result = await session.exec(query)
scores = list[Score](result.all())
elif type == LeaderboardType.FRIENDS and user and user.is_supporter:
# TODO
...
elif type == LeaderboardType.TEAM and user and user.team_membership:
team_id = user.team_membership.team_id
query = (
select(Score)
.join(Beatmap)
.options(joinedload(Score.user)) # pyright: ignore[reportArgumentType]
.where(
Score.map_md5 == beatmap_md5,
Score.gamemode == mode,
col(Score.passed).is_(True),
col(Score.user.team_membership).is_not(None),
Score.user.team_membership.team_id == team_id, # pyright: ignore[reportOptionalMemberAccess]
Score.mods == mods if user and user.is_supporter else false(),
)
.limit(limit)
.order_by(
col(Score.total_score).desc(),
)
)
result = await session.exec(query)
scores = list[Score](result.all())
) -> tuple[list[Score], Score | None]:
wheres = await _score_where(type, beatmap, mode, mods, user)
if wheres is None:
return [], None
query = (
select(BestScore)
.where(*wheres)
.limit(limit)
.order_by(col(BestScore.total_score).desc())
)
if mods:
query = query.params(w=json.dumps(mods))
scores = [s.score for s in await session.exec(query)]
user_score = None
if user:
user_score = (
await session.exec(
select(Score).where(
Score.map_md5 == beatmap_md5,
Score.gamemode == mode,
Score.user_id == user.id,
col(Score.passed).is_(True),
self_query = (
select(BestScore)
.where(BestScore.user_id == user.id)
.order_by(col(BestScore.total_score).desc())
.limit(1)
)
if mods:
self_query = self_query.where(
text(
"JSON_CONTAINS(total_score_best_scores.mods, :w)"
" AND JSON_CONTAINS(:w, total_score_best_scores.mods)"
)
)
).first()
).params(w=json.dumps(mods))
user_bs = (await session.exec(self_query)).first()
if user_bs:
user_score = user_bs.score
if user_score and user_score not in scores:
scores.append(user_score)
return scores
return scores, user_score
async def get_score_position_by_user(
session: AsyncSession,
beatmap_md5: str,
beatmap: int,
user: User,
mode: GameMode,
type: LeaderboardType = LeaderboardType.GLOBAL,
mods: list[APIMod] | None = None,
mods: list[str] | None = None,
) -> int:
where_clause = [
Score.map_md5 == beatmap_md5,
Score.gamemode == mode,
col(Score.passed).is_(True),
col(Beatmap.beatmap_status).in_(
[
BeatmapRankStatus.RANKED,
BeatmapRankStatus.LOVED,
BeatmapRankStatus.QUALIFIED,
BeatmapRankStatus.APPROVED,
]
),
]
if mods and user.is_supporter:
where_clause.append(Score.mods == mods)
else:
where_clause.append(false())
if type == LeaderboardType.FRIENDS and user.is_supporter:
# TODO
...
elif type == LeaderboardType.TEAM and user.team_membership:
team_id = user.team_membership.team_id
where_clause.append(
col(Score.user.team_membership).is_not(None),
)
where_clause.append(
Score.user.team_membership.team_id == team_id, # pyright: ignore[reportOptionalMemberAccess]
)
wheres = await _score_where(type, beatmap, mode, mods, user=user)
if wheres is None:
return 0
rownum = (
func.row_number()
.over(
partition_by=Score.map_md5,
order_by=col(Score.total_score).desc(),
partition_by=col(BestScore.beatmap_id),
order_by=col(BestScore.total_score).desc(),
)
.label("row_number")
)
subq = select(Score, rownum).join(Beatmap).where(*where_clause).subquery()
stmt = select(subq.c.row_number).where(subq.c.user == user)
subq = select(BestScore, rownum).join(Beatmap).where(*wheres).subquery()
stmt = select(subq.c.row_number).where(subq.c.user_id == user.id)
result = await session.exec(stmt)
s = result.one_or_none()
return s if s else 0
@@ -384,57 +374,26 @@ async def get_score_position_by_user(
async def get_score_position_by_id(
session: AsyncSession,
beatmap_md5: str,
beatmap: int,
score_id: int,
mode: GameMode,
user: User | None = None,
type: LeaderboardType = LeaderboardType.GLOBAL,
mods: list[APIMod] | None = None,
mods: list[str] | None = None,
) -> int:
where_clause = [
Score.map_md5 == beatmap_md5,
Score.id == score_id,
Score.gamemode == mode,
col(Score.passed).is_(True),
col(Beatmap.beatmap_status).in_(
[
BeatmapRankStatus.RANKED,
BeatmapRankStatus.LOVED,
BeatmapRankStatus.QUALIFIED,
BeatmapRankStatus.APPROVED,
]
),
]
if mods and user and user.is_supporter:
where_clause.append(Score.mods == mods)
elif mods:
where_clause.append(false())
wheres = await _score_where(type, beatmap, mode, mods, user=user)
if wheres is None:
return 0
rownum = (
func.row_number()
.over(
partition_by=[col(Score.user_id), col(Score.map_md5)],
order_by=col(Score.total_score).desc(),
partition_by=col(BestScore.beatmap_id),
order_by=col(BestScore.total_score).desc(),
)
.label("rownum")
.label("row_number")
)
subq = (
select(Score.user_id, Score.id, Score.total_score, rownum)
.join(Beatmap)
.where(*where_clause)
.subquery()
)
best_scores = aliased(subq)
overall_rank = (
func.rank().over(order_by=best_scores.c.total_score.desc()).label("global_rank")
)
final_q = (
select(best_scores.c.id, overall_rank)
.select_from(best_scores)
.where(best_scores.c.rownum == 1)
.subquery()
)
stmt = select(final_q.c.global_rank).where(final_q.c.id == score_id)
subq = select(BestScore, rownum).join(Beatmap).where(*wheres).subquery()
stmt = select(subq.c.row_number).where(subq.c.score_id == score_id)
result = await session.exec(stmt)
s = result.one_or_none()
return s if s else 0
@@ -445,16 +404,38 @@ async def get_user_best_score_in_beatmap(
beatmap: int,
user: int,
mode: GameMode | None = None,
) -> Score | None:
) -> BestScore | None:
return (
await session.exec(
Score.select_clause(False)
select(BestScore)
.where(
Score.gamemode == mode if mode is not None else True,
Score.beatmap_id == beatmap,
Score.user_id == user,
BestScore.gamemode == mode if mode is not None else true(),
BestScore.beatmap_id == beatmap,
BestScore.user_id == user,
)
.order_by(col(Score.total_score).desc())
.order_by(col(BestScore.total_score).desc())
)
).first()
# FIXME
async def get_user_best_score_with_mod_in_beatmap(
session: AsyncSession,
beatmap: int,
user: int,
mod: list[str],
mode: GameMode | None = None,
) -> BestScore | None:
return (
await session.exec(
select(BestScore)
.where(
BestScore.gamemode == mode if mode is not None else True,
BestScore.beatmap_id == beatmap,
BestScore.user_id == user,
# BestScore.mods == mod,
)
.order_by(col(BestScore.total_score).desc())
)
).first()
@@ -464,13 +445,13 @@ async def get_user_best_pp_in_beatmap(
beatmap: int,
user: int,
mode: GameMode,
) -> BestScore | None:
) -> PPBestScore | None:
return (
await session.exec(
select(BestScore).where(
BestScore.beatmap_id == beatmap,
BestScore.user_id == user,
BestScore.gamemode == mode,
select(PPBestScore).where(
PPBestScore.beatmap_id == beatmap,
PPBestScore.user_id == user,
PPBestScore.gamemode == mode,
)
)
).first()
@@ -480,12 +461,12 @@ async def get_user_best_pp(
session: AsyncSession,
user: int,
limit: int = 200,
) -> Sequence[BestScore]:
) -> Sequence[PPBestScore]:
return (
await session.exec(
select(BestScore)
.where(BestScore.user_id == user)
.order_by(col(BestScore.pp).desc())
select(PPBestScore)
.where(PPBestScore.user_id == user)
.order_by(col(PPBestScore.pp).desc())
.limit(limit)
)
).all()
@@ -494,27 +475,45 @@ async def get_user_best_pp(
async def process_user(
session: AsyncSession, user: User, score: Score, ranked: bool = False
):
assert user.id
assert score.id
mod_for_save = list({mod["acronym"] for mod in score.mods})
previous_score_best = await get_user_best_score_in_beatmap(
session, score.beatmap_id, user.id, score.gamemode
)
statistics = None
previous_score_best_mod = await get_user_best_score_with_mod_in_beatmap(
session, score.beatmap_id, user.id, mod_for_save, score.gamemode
)
add_to_db = False
for i in user.lazer_statistics:
mouthly_playcount = (
await session.exec(
select(MonthlyPlaycounts).where(
MonthlyPlaycounts.user_id == user.id,
MonthlyPlaycounts.year == date.today().year,
MonthlyPlaycounts.month == date.today().month,
)
)
).first()
if mouthly_playcount is None:
mouthly_playcount = MonthlyPlaycounts(
user_id=user.id, year=date.today().year, month=date.today().month
)
add_to_db = True
statistics = None
for i in await user.awaitable_attrs.statistics:
if i.mode == score.gamemode.value:
statistics = i
break
if statistics is None:
statistics = LazerUserStatistics(
mode=score.gamemode.value,
user_id=user.id,
raise ValueError(
f"User {user.id} does not have statistics for mode {score.gamemode.value}"
)
add_to_db = True
# pc, pt, tth, tts
statistics.total_score += score.total_score
difference = (
score.total_score - previous_score_best.total_score
if previous_score_best and previous_score_best.id != score.id
if previous_score_best
else score.total_score
)
if difference > 0 and score.passed and ranked:
@@ -541,11 +540,48 @@ async def process_user(
statistics.grade_sh -= 1
case Rank.A:
statistics.grade_a -= 1
else:
previous_score_best = BestScore(
user_id=user.id,
beatmap_id=score.beatmap_id,
gamemode=score.gamemode,
score_id=score.id,
total_score=score.total_score,
rank=score.rank,
mods=mod_for_save,
)
session.add(previous_score_best)
statistics.ranked_score += difference
statistics.level_current = calculate_score_to_level(statistics.ranked_score)
statistics.maximum_combo = max(statistics.maximum_combo, score.max_combo)
if score.passed and ranked:
if previous_score_best_mod is not None:
previous_score_best_mod.mods = mod_for_save
previous_score_best_mod.score_id = score.id
previous_score_best_mod.rank = score.rank
previous_score_best_mod.total_score = score.total_score
elif (
previous_score_best is not None and previous_score_best.score_id != score.id
):
session.add(
BestScore(
user_id=user.id,
beatmap_id=score.beatmap_id,
gamemode=score.gamemode,
score_id=score.id,
total_score=score.total_score,
rank=score.rank,
mods=mod_for_save,
)
)
statistics.play_count += 1
mouthly_playcount.playcount += 1
statistics.play_time += int((score.ended_at - score.started_at).total_seconds())
statistics.count_100 += score.n100 + score.nkatu
statistics.count_300 += score.n300 + score.ngeki
statistics.count_50 += score.n50
statistics.count_miss += score.nmiss
statistics.total_hits += (
score.n300 + score.n100 + score.n50 + score.ngeki + score.nkatu
)
@@ -563,11 +599,8 @@ async def process_user(
acc_sum = clamp(acc_sum, 0.0, 100.0)
statistics.pp = pp_sum
statistics.hit_accuracy = acc_sum
statistics.updated_at = datetime.now(UTC)
if add_to_db:
session.add(statistics)
session.add(mouthly_playcount)
await session.commit()
await session.refresh(user)
@@ -582,6 +615,8 @@ async def process_score(
session: AsyncSession,
redis: Redis,
) -> Score:
assert user.id
can_get_pp = info.passed and ranked and mods_can_get_pp(info.ruleset_id, info.mods)
score = Score(
accuracy=info.accuracy,
max_combo=info.max_combo,
@@ -611,7 +646,7 @@ async def process_score(
nlarge_tick_hit=info.statistics.get(HitResult.LARGE_TICK_HIT, 0),
nslider_tail_hit=info.statistics.get(HitResult.SLIDER_TAIL_HIT, 0),
)
if info.passed and ranked and mods_can_get_pp(info.ruleset_id, info.mods):
if can_get_pp:
beatmap_raw = await fetcher.get_or_fetch_beatmap_raw(redis, beatmap_id)
pp = await asyncio.get_event_loop().run_in_executor(
None, calculate_pp, score, beatmap_raw
@@ -621,13 +656,13 @@ async def process_score(
user_id = user.id
await session.commit()
await session.refresh(score)
if score.passed and ranked:
if can_get_pp:
previous_pp_best = await get_user_best_pp_in_beatmap(
session, beatmap_id, user_id, score.gamemode
)
if previous_pp_best is None or score.pp > previous_pp_best.pp:
assert score.id
best_score = BestScore(
best_score = PPBestScore(
user_id=user_id,
score_id=score.id,
beatmap_id=beatmap_id,
@@ -636,7 +671,7 @@ async def process_score(
acc=score.accuracy,
)
session.add(best_score)
session.delete(previous_pp_best) if previous_pp_best else None
await session.delete(previous_pp_best) if previous_pp_best else None
await session.commit()
await session.refresh(score)
await session.refresh(score_token)

View File

@@ -1,15 +1,16 @@
from datetime import datetime
from app.models.model import UTCBaseModel
from app.models.score import GameMode
from .beatmap import Beatmap
from .user import User
from .lazer_user import User
from sqlalchemy import Column, DateTime, Index
from sqlmodel import BigInteger, Field, ForeignKey, Relationship, SQLModel
class ScoreTokenBase(SQLModel):
class ScoreTokenBase(SQLModel, UTCBaseModel):
score_id: int | None = Field(sa_column=Column(BigInteger), default=None)
ruleset_id: GameMode
playlist_item_id: int | None = Field(default=None) # playlist
@@ -34,10 +35,10 @@ class ScoreToken(ScoreTokenBase, table=True):
autoincrement=True,
),
)
user_id: int = Field(sa_column=Column(BigInteger, ForeignKey("users.id")))
user_id: int = Field(sa_column=Column(BigInteger, ForeignKey("lazer_users.id")))
beatmap_id: int = Field(foreign_key="beatmaps.id")
user: "User" = Relationship()
beatmap: "Beatmap" = Relationship()
user: User = Relationship()
beatmap: Beatmap = Relationship()
class ScoreTokenResp(ScoreTokenBase):

View File

@@ -1,14 +1,16 @@
from datetime import datetime
from typing import TYPE_CHECKING
from app.models.model import UTCBaseModel
from sqlalchemy import Column, DateTime
from sqlmodel import BigInteger, Field, ForeignKey, Relationship, SQLModel
if TYPE_CHECKING:
from .user import User
from .lazer_user import User
class Team(SQLModel, table=True):
class Team(SQLModel, UTCBaseModel, table=True):
__tablename__ = "teams" # pyright: ignore[reportAssignmentType]
id: int | None = Field(default=None, primary_key=True, index=True)
@@ -22,15 +24,19 @@ class Team(SQLModel, table=True):
members: list["TeamMember"] = Relationship(back_populates="team")
class TeamMember(SQLModel, table=True):
class TeamMember(SQLModel, UTCBaseModel, table=True):
__tablename__ = "team_members" # pyright: ignore[reportAssignmentType]
id: int | None = Field(default=None, primary_key=True, index=True)
user_id: int = Field(sa_column=Column(BigInteger, ForeignKey("users.id")))
user_id: int = Field(sa_column=Column(BigInteger, ForeignKey("lazer_users.id")))
team_id: int = Field(foreign_key="teams.id")
joined_at: datetime = Field(
default_factory=datetime.utcnow, sa_column=Column(DateTime)
)
user: "User" = Relationship(back_populates="team_membership")
team: "Team" = Relationship(back_populates="members")
user: "User" = Relationship(
back_populates="team_membership", sa_relationship_kwargs={"lazy": "joined"}
)
team: "Team" = Relationship(
back_populates="members", sa_relationship_kwargs={"lazy": "joined"}
)

View File

@@ -1,527 +0,0 @@
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
from .legacy import LegacyUserStatistics
from .team import TeamMember
from sqlalchemy import DECIMAL, JSON, Column, Date, DateTime, Text
from sqlalchemy.dialects.mysql import VARCHAR
from sqlalchemy.orm import joinedload, selectinload
from sqlmodel import BigInteger, Field, ForeignKey, Relationship, SQLModel, select
class User(SQLModel, table=True):
__tablename__ = "users" # pyright: ignore[reportAssignmentType]
# 主键
id: int = Field(
default=None, sa_column=Column(BigInteger, primary_key=True, index=True)
)
# 基本信息(匹配 migrations_old 中的结构)
name: str = Field(max_length=32, unique=True, index=True) # 用户名
safe_name: str = Field(max_length=32, unique=True, index=True) # 安全用户名
email: str = Field(max_length=254, unique=True, index=True)
priv: int = Field(default=1) # 权限
pw_bcrypt: str = Field(max_length=60) # bcrypt 哈希密码
country: str = Field(default="CN", max_length=2) # 国家代码
# 状态和时间
silence_end: int = Field(default=0)
donor_end: int = Field(default=0)
creation_time: int = Field(default=0) # Unix 时间戳
latest_activity: int = Field(default=0) # Unix 时间戳
# 游戏相关
preferred_mode: int = Field(default=0) # 偏好游戏模式
play_style: int = Field(default=0) # 游戏风格
# 扩展信息
clan_id: int = Field(default=0)
clan_priv: int = Field(default=0)
custom_badge_name: str | None = Field(default=None, max_length=16)
custom_badge_icon: str | None = Field(default=None, max_length=64)
userpage_content: str | None = Field(default=None, max_length=2048)
api_key: str | None = Field(default=None, max_length=36, unique=True)
# 虚拟字段用于兼容性
@property
def username(self):
return self.name
@property
def country_code(self):
return self.country
@property
def join_date(self):
creation_time = getattr(self, "creation_time", 0)
return (
datetime.fromtimestamp(creation_time)
if creation_time > 0
else datetime.utcnow()
)
@property
def last_visit(self):
latest_activity = getattr(self, "latest_activity", 0)
return datetime.fromtimestamp(latest_activity) if latest_activity > 0 else None
@property
def is_supporter(self):
return self.lazer_profile.is_supporter if self.lazer_profile else False
# 关联关系
lazer_profile: Optional["LazerUserProfile"] = Relationship(back_populates="user")
lazer_statistics: list["LazerUserStatistics"] = Relationship(back_populates="user")
lazer_counts: Optional["LazerUserCounts"] = Relationship(back_populates="user")
lazer_achievements: list["LazerUserAchievement"] = Relationship(
back_populates="user"
)
lazer_profile_sections: list["LazerUserProfileSections"] = Relationship(
back_populates="user"
)
statistics: list["LegacyUserStatistics"] = Relationship(back_populates="user")
team_membership: Optional["TeamMember"] = Relationship(back_populates="user")
daily_challenge_stats: Optional["DailyChallengeStats"] = Relationship(
back_populates="user"
)
rank_history: list["RankHistory"] = Relationship(back_populates="user")
avatar: Optional["UserAvatar"] = Relationship(back_populates="user")
active_banners: list["LazerUserBanners"] = Relationship(back_populates="user")
lazer_badges: list["LazerUserBadge"] = Relationship(back_populates="user")
lazer_monthly_playcounts: list["LazerUserMonthlyPlaycounts"] = Relationship(
back_populates="user"
)
lazer_previous_usernames: list["LazerUserPreviousUsername"] = Relationship(
back_populates="user"
)
lazer_replays_watched: list["LazerUserReplaysWatched"] = Relationship(
back_populates="user"
)
@classmethod
def all_select_option(cls):
return (
joinedload(cls.lazer_profile), # pyright: ignore[reportArgumentType]
joinedload(cls.lazer_counts), # pyright: ignore[reportArgumentType]
joinedload(cls.daily_challenge_stats), # pyright: ignore[reportArgumentType]
joinedload(cls.avatar), # pyright: ignore[reportArgumentType]
selectinload(cls.lazer_statistics), # pyright: ignore[reportArgumentType]
selectinload(cls.lazer_achievements), # pyright: ignore[reportArgumentType]
selectinload(cls.lazer_profile_sections), # pyright: ignore[reportArgumentType]
selectinload(cls.statistics), # pyright: ignore[reportArgumentType]
joinedload(cls.team_membership), # pyright: ignore[reportArgumentType]
selectinload(cls.rank_history), # pyright: ignore[reportArgumentType]
selectinload(cls.active_banners), # pyright: ignore[reportArgumentType]
selectinload(cls.lazer_badges), # pyright: ignore[reportArgumentType]
selectinload(cls.lazer_monthly_playcounts), # pyright: ignore[reportArgumentType]
selectinload(cls.lazer_previous_usernames), # pyright: ignore[reportArgumentType]
selectinload(cls.lazer_replays_watched), # pyright: ignore[reportArgumentType]
)
@classmethod
def all_select_clause(cls):
return select(cls).options(*cls.all_select_option())
# ============================================
# Lazer API 专用表模型
# ============================================
class LazerUserProfile(SQLModel, table=True):
__tablename__ = "lazer_user_profiles" # pyright: ignore[reportAssignmentType]
user_id: int = Field(
default=None,
sa_column=Column(
BigInteger,
ForeignKey("users.id"),
primary_key=True,
),
)
# 基本状态字段
is_active: bool = Field(default=True)
is_bot: bool = Field(default=False)
is_deleted: bool = Field(default=False)
is_online: bool = Field(default=True)
is_supporter: bool = Field(default=False)
is_restricted: bool = Field(default=False)
session_verified: bool = Field(default=False)
has_supported: bool = Field(default=False)
pm_friends_only: bool = Field(default=False)
# 基本资料字段
default_group: str = Field(default="default", max_length=50)
last_visit: datetime | None = Field(default=None, sa_column=Column(DateTime))
join_date: datetime | None = Field(default=None, sa_column=Column(DateTime))
profile_colour: str | None = Field(default=None, max_length=7)
profile_hue: int | None = Field(default=None)
# 社交媒体和个人资料字段
avatar_url: str | None = Field(default=None, max_length=500)
cover_url: str | None = Field(default=None, max_length=500)
discord: str | None = Field(default=None, max_length=100)
twitter: str | None = Field(default=None, max_length=100)
website: str | None = Field(default=None, max_length=500)
title: str | None = Field(default=None, max_length=100)
title_url: str | None = Field(default=None, max_length=500)
interests: str | None = Field(default=None, sa_column=Column(Text))
location: str | None = Field(default=None, max_length=100)
occupation: str | None = Field(default=None) # 职业字段,默认为 None
# 游戏相关字段
playmode: str = Field(default="osu", max_length=10)
support_level: int = Field(default=0)
max_blocks: int = Field(default=100)
max_friends: int = Field(default=500)
post_count: int = Field(default=0)
# 页面内容
page_html: str | None = Field(default=None, sa_column=Column(Text))
page_raw: str | None = Field(default=None, sa_column=Column(Text))
profile_order: str = Field(
default="me,recent_activity,top_ranks,medals,historical,beatmaps,kudosu"
)
# 关联关系
user: "User" = Relationship(back_populates="lazer_profile")
class LazerUserProfileSections(SQLModel, table=True):
__tablename__ = "lazer_user_profile_sections" # pyright: ignore[reportAssignmentType]
id: int | None = Field(default=None, primary_key=True)
user_id: int = Field(sa_column=Column(BigInteger, ForeignKey("users.id")))
section_name: str = Field(sa_column=Column(VARCHAR(50)))
display_order: int | None = Field(default=None)
created_at: datetime = Field(
default_factory=datetime.utcnow, sa_column=Column(DateTime)
)
updated_at: datetime = Field(
default_factory=datetime.utcnow, sa_column=Column(DateTime)
)
user: "User" = Relationship(back_populates="lazer_profile_sections")
class LazerUserCountry(SQLModel, table=True):
__tablename__ = "lazer_user_countries" # pyright: ignore[reportAssignmentType]
user_id: int = Field(
default=None,
sa_column=Column(
BigInteger,
ForeignKey("users.id"),
primary_key=True,
),
)
code: str = Field(max_length=2)
name: str = Field(max_length=100)
created_at: datetime = Field(
default_factory=datetime.utcnow, sa_column=Column(DateTime)
)
updated_at: datetime = Field(
default_factory=datetime.utcnow, sa_column=Column(DateTime)
)
class LazerUserKudosu(SQLModel, table=True):
__tablename__ = "lazer_user_kudosu" # pyright: ignore[reportAssignmentType]
user_id: int = Field(
default=None,
sa_column=Column(
BigInteger,
ForeignKey("users.id"),
primary_key=True,
),
)
available: int = Field(default=0)
total: int = Field(default=0)
created_at: datetime = Field(
default_factory=datetime.utcnow, sa_column=Column(DateTime)
)
updated_at: datetime = Field(
default_factory=datetime.utcnow, sa_column=Column(DateTime)
)
class LazerUserCounts(SQLModel, table=True):
__tablename__ = "lazer_user_counts" # pyright: ignore[reportAssignmentType]
user_id: int = Field(
default=None,
sa_column=Column(
BigInteger,
ForeignKey("users.id"),
primary_key=True,
),
)
# 统计计数字段
beatmap_playcounts_count: int = Field(default=0)
comments_count: int = Field(default=0)
favourite_beatmapset_count: int = Field(default=0)
follower_count: int = Field(default=0)
graveyard_beatmapset_count: int = Field(default=0)
guest_beatmapset_count: int = Field(default=0)
loved_beatmapset_count: int = Field(default=0)
mapping_follower_count: int = Field(default=0)
nominated_beatmapset_count: int = Field(default=0)
pending_beatmapset_count: int = Field(default=0)
ranked_beatmapset_count: int = Field(default=0)
ranked_and_approved_beatmapset_count: int = Field(default=0)
unranked_beatmapset_count: int = Field(default=0)
scores_best_count: int = Field(default=0)
scores_first_count: int = Field(default=0)
scores_pinned_count: int = Field(default=0)
scores_recent_count: int = Field(default=0)
created_at: datetime = Field(
default_factory=datetime.utcnow, sa_column=Column(DateTime)
)
updated_at: datetime = Field(
default_factory=datetime.utcnow, sa_column=Column(DateTime)
)
# 关联关系
user: "User" = Relationship(back_populates="lazer_counts")
class LazerUserStatistics(SQLModel, table=True):
__tablename__ = "lazer_user_statistics" # pyright: ignore[reportAssignmentType]
user_id: int = Field(
default=None,
sa_column=Column(
BigInteger,
ForeignKey("users.id"),
primary_key=True,
),
)
mode: str = Field(default="osu", max_length=10, primary_key=True)
# 基本命中统计
count_100: int = Field(default=0)
count_300: int = Field(default=0)
count_50: int = Field(default=0)
count_miss: int = Field(default=0)
# 等级信息
level_current: int = Field(default=1)
level_progress: int = Field(default=0)
# 排名信息
global_rank: int | None = Field(default=None)
global_rank_exp: int | None = Field(default=None)
country_rank: int | None = Field(default=None)
# PP 和分数
pp: float = Field(default=0.00, sa_column=Column(DECIMAL(10, 2)))
pp_exp: float = Field(default=0.00, sa_column=Column(DECIMAL(10, 2)))
ranked_score: int = Field(default=0, sa_column=Column(BigInteger))
hit_accuracy: float = Field(default=0.00, sa_column=Column(DECIMAL(5, 2)))
total_score: int = Field(default=0, sa_column=Column(BigInteger))
total_hits: int = Field(default=0, sa_column=Column(BigInteger))
maximum_combo: int = Field(default=0)
# 游戏统计
play_count: int = Field(default=0)
play_time: int = Field(default=0) # 秒
replays_watched_by_others: int = Field(default=0)
is_ranked: bool = Field(default=False)
# 成绩等级计数
grade_ss: int = Field(default=0)
grade_ssh: int = Field(default=0)
grade_s: int = Field(default=0)
grade_sh: int = Field(default=0)
grade_a: int = Field(default=0)
# 最高排名记录
rank_highest: int | None = Field(default=None)
rank_highest_updated_at: datetime | None = Field(
default=None, sa_column=Column(DateTime)
)
created_at: datetime = Field(
default_factory=datetime.utcnow, sa_column=Column(DateTime)
)
updated_at: datetime = Field(
default_factory=datetime.utcnow, sa_column=Column(DateTime)
)
# 关联关系
user: "User" = Relationship(back_populates="lazer_statistics")
class LazerUserBanners(SQLModel, table=True):
__tablename__ = "lazer_user_tournament_banners" # pyright: ignore[reportAssignmentType]
id: int | None = Field(default=None, primary_key=True)
user_id: int = Field(sa_column=Column(BigInteger, ForeignKey("users.id")))
tournament_id: int
image_url: str = Field(sa_column=Column(VARCHAR(500)))
is_active: bool | None = Field(default=None)
# 修正user关系的back_populates值
user: "User" = Relationship(back_populates="active_banners")
class LazerUserAchievement(SQLModel, table=True):
__tablename__ = "lazer_user_achievements" # pyright: ignore[reportAssignmentType]
id: int | None = Field(default=None, primary_key=True, index=True)
user_id: int = Field(sa_column=Column(BigInteger, ForeignKey("users.id")))
achievement_id: int
achieved_at: datetime = Field(
default_factory=datetime.utcnow, sa_column=Column(DateTime)
)
user: "User" = Relationship(back_populates="lazer_achievements")
class LazerUserBadge(SQLModel, table=True):
__tablename__ = "lazer_user_badges" # pyright: ignore[reportAssignmentType]
id: int | None = Field(default=None, primary_key=True, index=True)
user_id: int = Field(sa_column=Column(BigInteger, ForeignKey("users.id")))
badge_id: int
awarded_at: datetime | None = Field(default=None, sa_column=Column(DateTime))
description: str | None = Field(default=None, sa_column=Column(Text))
image_url: str | None = Field(default=None, max_length=500)
url: str | None = Field(default=None, max_length=500)
created_at: datetime = Field(
default_factory=datetime.utcnow, sa_column=Column(DateTime)
)
updated_at: datetime = Field(
default_factory=datetime.utcnow, sa_column=Column(DateTime)
)
user: "User" = Relationship(back_populates="lazer_badges")
class LazerUserMonthlyPlaycounts(SQLModel, table=True):
__tablename__ = "lazer_user_monthly_playcounts" # pyright: ignore[reportAssignmentType]
id: int | None = Field(default=None, primary_key=True, index=True)
user_id: int = Field(sa_column=Column(BigInteger, ForeignKey("users.id")))
start_date: datetime = Field(sa_column=Column(Date))
play_count: int = Field(default=0)
created_at: datetime = Field(
default_factory=datetime.utcnow, sa_column=Column(DateTime)
)
updated_at: datetime = Field(
default_factory=datetime.utcnow, sa_column=Column(DateTime)
)
user: "User" = Relationship(back_populates="lazer_monthly_playcounts")
class LazerUserPreviousUsername(SQLModel, table=True):
__tablename__ = "lazer_user_previous_usernames" # pyright: ignore[reportAssignmentType]
id: int | None = Field(default=None, primary_key=True, index=True)
user_id: int = Field(sa_column=Column(BigInteger, ForeignKey("users.id")))
username: str = Field(max_length=32)
changed_at: datetime = Field(sa_column=Column(DateTime))
created_at: datetime = Field(
default_factory=datetime.utcnow, sa_column=Column(DateTime)
)
updated_at: datetime = Field(
default_factory=datetime.utcnow, sa_column=Column(DateTime)
)
user: "User" = Relationship(back_populates="lazer_previous_usernames")
class LazerUserReplaysWatched(SQLModel, table=True):
__tablename__ = "lazer_user_replays_watched" # pyright: ignore[reportAssignmentType]
id: int | None = Field(default=None, primary_key=True, index=True)
user_id: int = Field(sa_column=Column(BigInteger, ForeignKey("users.id")))
start_date: datetime = Field(sa_column=Column(Date))
count: int = Field(default=0)
created_at: datetime = Field(
default_factory=datetime.utcnow, sa_column=Column(DateTime)
)
updated_at: datetime = Field(
default_factory=datetime.utcnow, sa_column=Column(DateTime)
)
user: "User" = Relationship(back_populates="lazer_replays_watched")
# 类型转换用的 UserAchievement不是 SQLAlchemy 模型)
@dataclass
class UserAchievement:
achieved_at: datetime
achievement_id: int
class DailyChallengeStats(SQLModel, table=True):
__tablename__ = "daily_challenge_stats" # pyright: ignore[reportAssignmentType]
id: int | None = Field(default=None, primary_key=True, index=True)
user_id: int = Field(
sa_column=Column(BigInteger, ForeignKey("users.id"), unique=True)
)
daily_streak_best: int = Field(default=0)
daily_streak_current: int = Field(default=0)
last_update: datetime | None = Field(default=None, sa_column=Column(DateTime))
last_weekly_streak: datetime | None = Field(
default=None, sa_column=Column(DateTime)
)
playcount: int = Field(default=0)
top_10p_placements: int = Field(default=0)
top_50p_placements: int = Field(default=0)
weekly_streak_best: int = Field(default=0)
weekly_streak_current: int = Field(default=0)
user: "User" = Relationship(back_populates="daily_challenge_stats")
class RankHistory(SQLModel, table=True):
__tablename__ = "rank_history" # pyright: ignore[reportAssignmentType]
id: int | None = Field(default=None, primary_key=True, index=True)
user_id: int = Field(sa_column=Column(BigInteger, ForeignKey("users.id")))
mode: str = Field(max_length=10)
rank_data: list = Field(sa_column=Column(JSON)) # Array of ranks
date_recorded: datetime = Field(
default_factory=datetime.utcnow, sa_column=Column(DateTime)
)
user: "User" = Relationship(back_populates="rank_history")
class UserAvatar(SQLModel, table=True):
__tablename__ = "user_avatars" # pyright: ignore[reportAssignmentType]
id: int | None = Field(default=None, primary_key=True, index=True)
user_id: int = Field(sa_column=Column(BigInteger, ForeignKey("users.id")))
filename: str = Field(max_length=255)
original_filename: str = Field(max_length=255)
file_size: int
mime_type: str = Field(max_length=100)
is_active: bool = Field(default=True)
created_at: int = Field(default_factory=lambda: int(datetime.now().timestamp()))
updated_at: int = Field(default_factory=lambda: int(datetime.now().timestamp()))
r2_original_url: str | None = Field(default=None, max_length=500)
r2_game_url: str | None = Field(default=None, max_length=500)
user: "User" = Relationship(back_populates="avatar")