refactor(user): refactor user database
**Breaking Change** 用户表变为 lazer_users 建议删除与用户关联的表进行迁移
This commit is contained in:
459
app/utils.py
459
app/utils.py
@@ -1,465 +1,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import UTC, datetime
|
||||
|
||||
from app.database import (
|
||||
LazerUserCounts,
|
||||
LazerUserProfile,
|
||||
LazerUserStatistics,
|
||||
User as DBUser,
|
||||
)
|
||||
from app.models.user import (
|
||||
Country,
|
||||
Cover,
|
||||
DailyChallengeStats,
|
||||
GradeCounts,
|
||||
Kudosu,
|
||||
Level,
|
||||
Page,
|
||||
RankHighest,
|
||||
RankHistory,
|
||||
Statistics,
|
||||
User,
|
||||
UserAchievement,
|
||||
)
|
||||
|
||||
|
||||
def unix_timestamp_to_windows(timestamp: int) -> int:
|
||||
"""Convert a Unix timestamp to a Windows timestamp."""
|
||||
return (timestamp + 62135596800) * 10_000_000
|
||||
|
||||
|
||||
async def convert_db_user_to_api_user(db_user: DBUser, ruleset: str = "osu") -> User:
|
||||
"""将数据库用户模型转换为API用户模型(使用 Lazer 表)"""
|
||||
|
||||
# 从db_user获取基本字段值
|
||||
user_id = getattr(db_user, "id")
|
||||
user_name = getattr(db_user, "name")
|
||||
user_country = getattr(db_user, "country")
|
||||
user_country_code = user_country # 在User模型中,country字段就是country_code
|
||||
|
||||
# 获取 Lazer 用户资料
|
||||
profile = db_user.lazer_profile
|
||||
if not profile:
|
||||
# 如果没有 lazer 资料,使用默认值
|
||||
profile = LazerUserProfile(
|
||||
user_id=user_id,
|
||||
)
|
||||
|
||||
# 获取 Lazer 用户计数 - 使用正确的 lazer_counts 关系
|
||||
lzrcnt = db_user.lazer_counts
|
||||
|
||||
if not lzrcnt:
|
||||
# 如果没有 lazer 计数,使用默认值
|
||||
lzrcnt = LazerUserCounts(user_id=user_id)
|
||||
|
||||
# 获取指定模式的统计信息
|
||||
user_stats = None
|
||||
if db_user.lazer_statistics:
|
||||
for stat in db_user.lazer_statistics:
|
||||
if stat.mode == ruleset:
|
||||
user_stats = stat
|
||||
break
|
||||
|
||||
if not user_stats:
|
||||
# 如果没有找到指定模式的统计,创建默认统计
|
||||
user_stats = LazerUserStatistics(user_id=user_id)
|
||||
|
||||
# 获取国家信息
|
||||
country_code = db_user.country_code if db_user.country_code is not None else "XX"
|
||||
|
||||
country = Country(code=str(country_code), name=get_country_name(str(country_code)))
|
||||
|
||||
# 获取 Kudosu 信息
|
||||
kudosu = Kudosu(available=0, total=0)
|
||||
|
||||
# 获取计数信息
|
||||
# counts = LazerUserCounts(user_id=user_id)
|
||||
|
||||
# 转换统计信息
|
||||
statistics = Statistics(
|
||||
count_100=user_stats.count_100,
|
||||
count_300=user_stats.count_300,
|
||||
count_50=user_stats.count_50,
|
||||
count_miss=user_stats.count_miss,
|
||||
level=Level(
|
||||
current=user_stats.level_current, progress=user_stats.level_progress
|
||||
),
|
||||
global_rank=user_stats.global_rank,
|
||||
global_rank_exp=user_stats.global_rank_exp,
|
||||
pp=float(user_stats.pp) if user_stats.pp else 0.0,
|
||||
pp_exp=float(user_stats.pp_exp) if user_stats.pp_exp else 0.0,
|
||||
ranked_score=user_stats.ranked_score,
|
||||
hit_accuracy=float(user_stats.hit_accuracy) if user_stats.hit_accuracy else 0.0,
|
||||
play_count=user_stats.play_count,
|
||||
play_time=user_stats.play_time,
|
||||
total_score=user_stats.total_score,
|
||||
total_hits=user_stats.total_hits,
|
||||
maximum_combo=user_stats.maximum_combo,
|
||||
replays_watched_by_others=user_stats.replays_watched_by_others,
|
||||
is_ranked=user_stats.is_ranked,
|
||||
grade_counts=GradeCounts(
|
||||
ss=user_stats.grade_ss,
|
||||
ssh=user_stats.grade_ssh,
|
||||
s=user_stats.grade_s,
|
||||
sh=user_stats.grade_sh,
|
||||
a=user_stats.grade_a,
|
||||
),
|
||||
country_rank=user_stats.country_rank,
|
||||
rank={"country": user_stats.country_rank} if user_stats.country_rank else None,
|
||||
)
|
||||
|
||||
# 转换所有模式的统计信息
|
||||
statistics_rulesets = {}
|
||||
if db_user.lazer_statistics:
|
||||
for stat in db_user.lazer_statistics:
|
||||
statistics_rulesets[stat.mode] = Statistics(
|
||||
count_100=stat.count_100,
|
||||
count_300=stat.count_300,
|
||||
count_50=stat.count_50,
|
||||
count_miss=stat.count_miss,
|
||||
level=Level(current=stat.level_current, progress=stat.level_progress),
|
||||
global_rank=stat.global_rank,
|
||||
global_rank_exp=stat.global_rank_exp,
|
||||
pp=float(stat.pp) if stat.pp else 0.0,
|
||||
pp_exp=float(stat.pp_exp) if stat.pp_exp else 0.0,
|
||||
ranked_score=stat.ranked_score,
|
||||
hit_accuracy=float(stat.hit_accuracy) if stat.hit_accuracy else 0.0,
|
||||
play_count=stat.play_count,
|
||||
play_time=stat.play_time,
|
||||
total_score=stat.total_score,
|
||||
total_hits=stat.total_hits,
|
||||
maximum_combo=stat.maximum_combo,
|
||||
replays_watched_by_others=stat.replays_watched_by_others,
|
||||
is_ranked=stat.is_ranked,
|
||||
grade_counts=GradeCounts(
|
||||
ss=stat.grade_ss,
|
||||
ssh=stat.grade_ssh,
|
||||
s=stat.grade_s,
|
||||
sh=stat.grade_sh,
|
||||
a=stat.grade_a,
|
||||
),
|
||||
country_rank=stat.country_rank,
|
||||
rank={"country": stat.country_rank} if stat.country_rank else None,
|
||||
)
|
||||
|
||||
# 转换国家信息
|
||||
country = Country(code=user_country_code, name=get_country_name(user_country_code))
|
||||
|
||||
# 转换封面信息
|
||||
cover_url = (
|
||||
profile.cover_url
|
||||
if profile and profile.cover_url
|
||||
else "https://assets.ppy.sh/user-profile-covers/default.jpeg"
|
||||
)
|
||||
cover = Cover(
|
||||
custom_url=profile.cover_url if profile else None, url=str(cover_url), id=None
|
||||
)
|
||||
|
||||
# 转换 Kudosu 信息
|
||||
kudosu = Kudosu(available=0, total=0)
|
||||
|
||||
# 转换成就信息
|
||||
user_achievements = []
|
||||
if db_user.lazer_achievements:
|
||||
for achievement in db_user.lazer_achievements:
|
||||
user_achievements.append(
|
||||
UserAchievement(
|
||||
achieved_at=achievement.achieved_at,
|
||||
achievement_id=achievement.achievement_id,
|
||||
)
|
||||
)
|
||||
|
||||
# 转换排名历史
|
||||
rank_history = None
|
||||
rank_history_data = None
|
||||
for rh in db_user.rank_history:
|
||||
if rh.mode == ruleset:
|
||||
rank_history_data = rh.rank_data
|
||||
break
|
||||
|
||||
if rank_history_data:
|
||||
rank_history = RankHistory(mode=ruleset, data=rank_history_data)
|
||||
|
||||
# 转换每日挑战统计
|
||||
# daily_challenge_stats = None
|
||||
# if db_user.daily_challenge_stats:
|
||||
# dcs = db_user.daily_challenge_stats
|
||||
# daily_challenge_stats = DailyChallengeStats(
|
||||
# daily_streak_best=dcs.daily_streak_best,
|
||||
# daily_streak_current=dcs.daily_streak_current,
|
||||
# last_update=dcs.last_update,
|
||||
# last_weekly_streak=dcs.last_weekly_streak,
|
||||
# playcount=dcs.playcount,
|
||||
# top_10p_placements=dcs.top_10p_placements,
|
||||
# top_50p_placements=dcs.top_50p_placements,
|
||||
# user_id=dcs.user_id,
|
||||
# weekly_streak_best=dcs.weekly_streak_best,
|
||||
# weekly_streak_current=dcs.weekly_streak_current,
|
||||
# )
|
||||
|
||||
# 转换最高排名
|
||||
rank_highest = None
|
||||
if user_stats.rank_highest:
|
||||
rank_highest = RankHighest(
|
||||
rank=user_stats.rank_highest,
|
||||
updated_at=user_stats.rank_highest_updated_at or datetime.utcnow(),
|
||||
)
|
||||
|
||||
# 转换团队信息
|
||||
team = None
|
||||
if db_user.team_membership:
|
||||
team_member = db_user.team_membership # 假设用户只属于一个团队
|
||||
team = team_member.team
|
||||
|
||||
# 创建用户对象
|
||||
# 从db_user获取基本字段值
|
||||
user_id = getattr(db_user, "id")
|
||||
user_name = getattr(db_user, "name")
|
||||
user_country = getattr(db_user, "country")
|
||||
|
||||
# 获取用户头像URL
|
||||
avatar_url = None
|
||||
|
||||
# 首先检查 profile 中的 avatar_url
|
||||
if profile and hasattr(profile, "avatar_url") and profile.avatar_url:
|
||||
avatar_url = str(profile.avatar_url)
|
||||
|
||||
# 然后检查是否有关联的头像记录
|
||||
if avatar_url is None and hasattr(db_user, "avatar") and db_user.avatar is not None:
|
||||
if db_user.avatar.r2_game_url:
|
||||
# 优先使用游戏用的头像URL
|
||||
avatar_url = str(db_user.avatar.r2_game_url)
|
||||
elif db_user.avatar.r2_original_url:
|
||||
# 其次使用原始头像URL
|
||||
avatar_url = str(db_user.avatar.r2_original_url)
|
||||
|
||||
# 如果还是没有找到,通过查询获取
|
||||
# if db_session and avatar_url is None:
|
||||
# try:
|
||||
# # 导入UserAvatar模型
|
||||
|
||||
# # 尝试查找用户的头像记录
|
||||
# statement = select(UserAvatar).where(
|
||||
# UserAvatar.user_id == user_id, UserAvatar.is_active == True
|
||||
# )
|
||||
# avatar_record = db_session.exec(statement).first()
|
||||
# if avatar_record is not None:
|
||||
# if avatar_record.r2_game_url is not None:
|
||||
# # 优先使用游戏用的头像URL
|
||||
# avatar_url = str(avatar_record.r2_game_url)
|
||||
# elif avatar_record.r2_original_url is not None:
|
||||
# # 其次使用原始头像URL
|
||||
# avatar_url = str(avatar_record.r2_original_url)
|
||||
# except Exception as e:
|
||||
# print(f"获取用户头像时出错: {e}")
|
||||
# print(f"最终头像URL: {avatar_url}")
|
||||
# 如果仍然没有找到头像URL,则使用默认URL
|
||||
if avatar_url is None:
|
||||
avatar_url = "https://a.gu-osu.gmoe.cc/api/users/avatar/1"
|
||||
|
||||
# 处理 profile_order 列表排序
|
||||
profile_order = [
|
||||
"me",
|
||||
"recent_activity",
|
||||
"top_ranks",
|
||||
"medals",
|
||||
"historical",
|
||||
"beatmaps",
|
||||
"kudosu",
|
||||
]
|
||||
if profile and profile.profile_order:
|
||||
profile_order = profile.profile_order.split(",")
|
||||
|
||||
# 在convert_db_user_to_api_user函数中添加active_tournament_banners处理
|
||||
active_tournament_banners = []
|
||||
if db_user.active_banners:
|
||||
for banner in db_user.active_banners:
|
||||
active_tournament_banners.append(
|
||||
{
|
||||
"tournament_id": banner.tournament_id,
|
||||
"image_url": banner.image_url,
|
||||
"is_active": banner.is_active,
|
||||
}
|
||||
)
|
||||
|
||||
# 在convert_db_user_to_api_user函数中添加badges处理
|
||||
badges = []
|
||||
if db_user.lazer_badges:
|
||||
for badge in db_user.lazer_badges:
|
||||
badges.append(
|
||||
{
|
||||
"badge_id": badge.badge_id,
|
||||
"awarded_at": badge.awarded_at,
|
||||
"description": badge.description,
|
||||
"image_url": badge.image_url,
|
||||
"url": badge.url,
|
||||
}
|
||||
)
|
||||
|
||||
# 在convert_db_user_to_api_user函数中添加monthly_playcounts处理
|
||||
monthly_playcounts = []
|
||||
if db_user.lazer_monthly_playcounts:
|
||||
for playcount in db_user.lazer_monthly_playcounts:
|
||||
monthly_playcounts.append(
|
||||
{
|
||||
"start_date": playcount.start_date.isoformat()
|
||||
if playcount.start_date
|
||||
else None,
|
||||
"play_count": playcount.play_count,
|
||||
}
|
||||
)
|
||||
|
||||
# 在convert_db_user_to_api_user函数中添加previous_usernames处理
|
||||
previous_usernames = []
|
||||
if db_user.lazer_previous_usernames:
|
||||
for username in db_user.lazer_previous_usernames:
|
||||
previous_usernames.append(
|
||||
{
|
||||
"username": username.username,
|
||||
"changed_at": username.changed_at.isoformat()
|
||||
if username.changed_at
|
||||
else None,
|
||||
}
|
||||
)
|
||||
|
||||
# 在convert_db_user_to_api_user函数中添加replays_watched_counts处理
|
||||
replays_watched_counts = []
|
||||
if hasattr(db_user, "lazer_replays_watched") and db_user.lazer_replays_watched:
|
||||
for replay in db_user.lazer_replays_watched:
|
||||
replays_watched_counts.append(
|
||||
{
|
||||
"start_date": replay.start_date.isoformat()
|
||||
if replay.start_date
|
||||
else None,
|
||||
"count": replay.count,
|
||||
}
|
||||
)
|
||||
|
||||
# 创建用户对象
|
||||
user = User(
|
||||
id=user_id,
|
||||
username=user_name,
|
||||
avatar_url=avatar_url,
|
||||
country_code=str(country_code),
|
||||
default_group=profile.default_group if profile else "default",
|
||||
is_active=profile.is_active,
|
||||
is_bot=profile.is_bot,
|
||||
is_deleted=profile.is_deleted,
|
||||
is_online=profile.is_online,
|
||||
is_supporter=profile.is_supporter,
|
||||
is_restricted=profile.is_restricted,
|
||||
last_visit=db_user.last_visit,
|
||||
pm_friends_only=profile.pm_friends_only,
|
||||
profile_colour=profile.profile_colour,
|
||||
cover_url=profile.cover_url
|
||||
if profile and profile.cover_url
|
||||
else "https://assets.ppy.sh/user-profile-covers/default.jpeg",
|
||||
discord=profile.discord if profile else None,
|
||||
has_supported=profile.has_supported if profile else False,
|
||||
interests=profile.interests if profile else None,
|
||||
join_date=profile.join_date if profile.join_date else datetime.now(UTC),
|
||||
location=profile.location if profile else None,
|
||||
max_blocks=profile.max_blocks if profile and profile.max_blocks else 100,
|
||||
max_friends=profile.max_friends if profile and profile.max_friends else 500,
|
||||
post_count=profile.post_count if profile and profile.post_count else 0,
|
||||
profile_hue=profile.profile_hue if profile and profile.profile_hue else None,
|
||||
profile_order=profile_order, # 使用排序后的 profile_order
|
||||
title=profile.title if profile else None,
|
||||
title_url=profile.title_url if profile else None,
|
||||
twitter=profile.twitter if profile else None,
|
||||
website=profile.website if profile else None,
|
||||
session_verified=True,
|
||||
support_level=profile.support_level if profile else 0,
|
||||
country=country,
|
||||
cover=cover,
|
||||
kudosu=kudosu,
|
||||
statistics=statistics,
|
||||
statistics_rulesets=statistics_rulesets,
|
||||
beatmap_playcounts_count=lzrcnt.beatmap_playcounts_count if lzrcnt else 0,
|
||||
comments_count=lzrcnt.comments_count if lzrcnt else 0,
|
||||
favourite_beatmapset_count=lzrcnt.favourite_beatmapset_count if lzrcnt else 0,
|
||||
follower_count=lzrcnt.follower_count if lzrcnt else 0,
|
||||
graveyard_beatmapset_count=lzrcnt.graveyard_beatmapset_count if lzrcnt else 0,
|
||||
guest_beatmapset_count=lzrcnt.guest_beatmapset_count if lzrcnt else 0,
|
||||
loved_beatmapset_count=lzrcnt.loved_beatmapset_count if lzrcnt else 0,
|
||||
mapping_follower_count=lzrcnt.mapping_follower_count if lzrcnt else 0,
|
||||
nominated_beatmapset_count=lzrcnt.nominated_beatmapset_count if lzrcnt else 0,
|
||||
pending_beatmapset_count=lzrcnt.pending_beatmapset_count if lzrcnt else 0,
|
||||
ranked_beatmapset_count=lzrcnt.ranked_beatmapset_count if lzrcnt else 0,
|
||||
ranked_and_approved_beatmapset_count=lzrcnt.ranked_and_approved_beatmapset_count
|
||||
if lzrcnt
|
||||
else 0,
|
||||
unranked_beatmapset_count=lzrcnt.unranked_beatmapset_count if lzrcnt else 0,
|
||||
scores_best_count=lzrcnt.scores_best_count if lzrcnt else 0,
|
||||
scores_first_count=lzrcnt.scores_first_count if lzrcnt else 0,
|
||||
scores_pinned_count=lzrcnt.scores_pinned_count,
|
||||
scores_recent_count=lzrcnt.scores_recent_count if lzrcnt else 0,
|
||||
account_history=[], # TODO: 获取用户历史账户信息
|
||||
# active_tournament_banner=len(active_tournament_banners),
|
||||
active_tournament_banners=active_tournament_banners,
|
||||
badges=badges,
|
||||
current_season_stats=None,
|
||||
daily_challenge_user_stats=DailyChallengeStats(
|
||||
user_id=user_id,
|
||||
daily_streak_best=db_user.daily_challenge_stats.daily_streak_best
|
||||
if db_user.daily_challenge_stats
|
||||
else 0,
|
||||
daily_streak_current=db_user.daily_challenge_stats.daily_streak_current
|
||||
if db_user.daily_challenge_stats
|
||||
else 0,
|
||||
last_update=db_user.daily_challenge_stats.last_update
|
||||
if db_user.daily_challenge_stats
|
||||
else None,
|
||||
last_weekly_streak=db_user.daily_challenge_stats.last_weekly_streak
|
||||
if db_user.daily_challenge_stats
|
||||
else None,
|
||||
playcount=db_user.daily_challenge_stats.playcount
|
||||
if db_user.daily_challenge_stats
|
||||
else 0,
|
||||
top_10p_placements=db_user.daily_challenge_stats.top_10p_placements
|
||||
if db_user.daily_challenge_stats
|
||||
else 0,
|
||||
top_50p_placements=db_user.daily_challenge_stats.top_50p_placements
|
||||
if db_user.daily_challenge_stats
|
||||
else 0,
|
||||
weekly_streak_best=db_user.daily_challenge_stats.weekly_streak_best
|
||||
if db_user.daily_challenge_stats
|
||||
else 0,
|
||||
weekly_streak_current=db_user.daily_challenge_stats.weekly_streak_current
|
||||
if db_user.daily_challenge_stats
|
||||
else 0,
|
||||
),
|
||||
groups=[],
|
||||
monthly_playcounts=monthly_playcounts,
|
||||
page=Page(html=profile.page_html or "", raw=profile.page_raw or "")
|
||||
if profile.page_html or profile.page_raw
|
||||
else Page(),
|
||||
previous_usernames=previous_usernames,
|
||||
rank_highest=rank_highest,
|
||||
rank_history=rank_history,
|
||||
rankHistory=rank_history,
|
||||
replays_watched_counts=replays_watched_counts,
|
||||
team=team,
|
||||
user_achievements=user_achievements,
|
||||
)
|
||||
|
||||
return user
|
||||
|
||||
|
||||
def get_country_name(country_code: str) -> str:
|
||||
"""根据国家代码获取国家名称"""
|
||||
country_names = {
|
||||
"CN": "China",
|
||||
"JP": "Japan",
|
||||
"US": "United States",
|
||||
"GB": "United Kingdom",
|
||||
"DE": "Germany",
|
||||
"FR": "France",
|
||||
"KR": "South Korea",
|
||||
"CA": "Canada",
|
||||
"AU": "Australia",
|
||||
"BR": "Brazil",
|
||||
# 可以添加更多国家
|
||||
}
|
||||
return country_names.get(country_code, "Unknown")
|
||||
|
||||
Reference in New Issue
Block a user