Add public API for player statistics and information queries

This commit is contained in:
咕谷酱
2025-09-11 02:34:01 +08:00
parent 7d79f3cee7
commit e589e68881
6 changed files with 661 additions and 3 deletions

121
app/models/v1_user.py Normal file
View File

@@ -0,0 +1,121 @@
"""V1 API 用户相关模型"""
from __future__ import annotations
from pydantic import BaseModel, Field
class PlayerStatsHistory(BaseModel):
"""玩家 PP 历史数据"""
pp: list[float] = Field(default_factory=list)
class PlayerModeStats(BaseModel):
"""单个模式的玩家统计数据"""
id: int
mode: int
tscore: int # total_score
rscore: int # ranked_score
pp: float
plays: int # play_count
playtime: int # play_time
acc: float # accuracy
max_combo: int # maximum_combo
total_hits: int
replay_views: int # replays_watched_by_others
xh_count: int # grade_ssh
x_count: int # grade_ss
sh_count: int # grade_sh
s_count: int # grade_s
a_count: int # grade_a
level: int
level_progress: int
rank: int
country_rank: int
history: PlayerStatsHistory
class PlayerStatsResponse(BaseModel):
"""玩家统计信息响应 - 包含所有模式"""
stats: dict[str, PlayerModeStats] = Field(default_factory=dict)
class PlayerEventItem(BaseModel):
"""玩家事件项目"""
userId: int
name: str
mapId: int | None = None
setId: int | None = None
artist: str | None = None
title: str | None = None
version: str | None = None
mode: int | None = None
rank: int | None = None
grade: str | None = None
event: str | None = None
time: str | None = None
class PlayerEventsResponse(BaseModel):
"""玩家事件响应"""
events: list[PlayerEventItem] = Field(default_factory=list)
class PlayerInfo(BaseModel):
"""玩家基本信息"""
id: int
name: str
safe_name: str
priv: int
country: str
silence_end: int
donor_end: int
creation_time: int
latest_activity: int
clan_id: int
clan_priv: int
preferred_mode: int
preferred_type: int
play_style: int
custom_badge_enabled: int
custom_badge_name: str
custom_badge_icon: str
custom_badge_color: str
userpage_content: str
recentFailed: int
social_discord: str | None = None
social_youtube: str | None = None
social_twitter: str | None = None
social_twitch: str | None = None
social_github: str | None = None
social_osu: str | None = None
username_history: list[str] = Field(default_factory=list)
class PlayerInfoResponse(BaseModel):
"""玩家信息响应"""
info: PlayerInfo
class PlayerAllResponse(BaseModel):
"""玩家完整信息响应 - 包含所有数据"""
info: PlayerInfo
stats: dict[str, PlayerModeStats] = Field(default_factory=dict)
events: list[PlayerEventItem] = Field(default_factory=list)
class GetPlayerInfoResponse(BaseModel):
"""get_player_info 接口响应"""
status: str = "success"
player: PlayerStatsResponse | PlayerEventsResponse | PlayerInfoResponse | PlayerAllResponse
class PlayerCountData(BaseModel):
"""玩家数量数据"""
online: int
total: int
class GetPlayerCountResponse(BaseModel):
"""get_player_count 接口响应"""
status: str = "success"
counts: PlayerCountData

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
from . import beatmap, replay, score, user # noqa: F401
from . import beatmap, replay, score, user, public_user # noqa: F401
from .router import router as api_v1_router
from .public_router import public_router as api_v1_public_router
__all__ = ["api_v1_router"]
__all__ = ["api_v1_router", "api_v1_public_router"]

View File

@@ -0,0 +1,26 @@
from __future__ import annotations
from datetime import datetime
from enum import Enum
from app.dependencies.rate_limit import LIMITERS
from fastapi import APIRouter
from pydantic import BaseModel, field_serializer
# 公共V1 API路由器 - 不需要认证
public_router = APIRouter(prefix="/api/v1", dependencies=LIMITERS, tags=["V1 Public API"])
class AllStrModel(BaseModel):
@field_serializer("*", when_used="json")
def serialize_datetime(self, v, _info):
if isinstance(v, Enum):
return str(v.value)
elif isinstance(v, datetime):
return v.strftime("%Y-%m-%d %H:%M:%S")
elif isinstance(v, bool):
return "1" if v else "0"
elif isinstance(v, list):
return [self.serialize_datetime(item, _info) for item in v]
return str(v)

View File

@@ -0,0 +1,344 @@
from __future__ import annotations
from typing import Literal
from app.database.lazer_user import User
from app.database.statistics import UserStatistics
from app.dependencies.database import Database, get_redis
from app.models.v1_user import (
GetPlayerInfoResponse,
PlayerStatsResponse,
PlayerEventsResponse,
PlayerInfoResponse,
PlayerAllResponse,
PlayerStatsHistory,
GetPlayerCountResponse,
PlayerCountData
)
from app.models.score import GameMode
from app.log import logger
from app.router.v1.public_router import public_router, AllStrModel
from fastapi import HTTPException, Query
from sqlmodel import select
async def _create_player_mode_stats(session: Database, user: User, mode: GameMode, user_statistics: list[UserStatistics]):
"""创建指定模式的玩家统计数据"""
from app.models.v1_user import PlayerModeStats
# 查找对应模式的统计数据
statistics = None
for stats in user_statistics:
if stats.mode == mode:
statistics = stats
break
if not statistics:
# 如果没有统计数据,返回默认值
return PlayerModeStats(
id=user.id,
mode=int(mode),
tscore=0,
rscore=0,
pp=0.0,
plays=0,
playtime=0,
acc=0.0,
max_combo=0,
total_hits=0,
replay_views=0,
xh_count=0,
x_count=0,
sh_count=0,
s_count=0,
a_count=0,
level=1,
level_progress=0,
rank=0,
country_rank=0,
history=PlayerStatsHistory()
)
return PlayerModeStats(
id=user.id,
mode=int(mode),
tscore=statistics.total_score if statistics.total_score else 0,
rscore=statistics.ranked_score if statistics.ranked_score else 0,
pp=float(statistics.pp) if statistics.pp else 0.0,
plays=statistics.play_count if statistics.play_count else 0,
playtime=statistics.play_time if statistics.play_time else 0,
acc=float(statistics.hit_accuracy) if statistics.hit_accuracy else 0.0,
max_combo=statistics.maximum_combo if statistics.maximum_combo else 0,
total_hits=statistics.total_hits if statistics.total_hits else 0,
replay_views=statistics.replays_watched_by_others if statistics.replays_watched_by_others else 0,
xh_count=statistics.grade_ssh if statistics.grade_ssh else 0,
x_count=statistics.grade_ss if statistics.grade_ss else 0,
sh_count=statistics.grade_sh if statistics.grade_sh else 0,
s_count=statistics.grade_s if statistics.grade_s else 0,
a_count=statistics.grade_a if statistics.grade_a else 0,
level=int(statistics.level_current) if statistics.level_current else 1,
level_progress=0, # TODO: 计算等级进度
rank=0, # global_rank需要从RankHistory获取
country_rank=0, # country_rank需要从其他地方获取
history=PlayerStatsHistory() # TODO: 获取PP历史数据
)
async def _create_player_info(user: User):
"""创建玩家基本信息"""
from app.models.v1_user import PlayerInfo
return PlayerInfo(
id=user.id,
name=user.username,
safe_name=user.username, # 使用 username 作为 safe_name
priv=user.priv if user.priv else 1,
country=user.country_code if user.country_code else "",
silence_end=int(user.silence_end_at.timestamp()) if user.silence_end_at else 0,
donor_end=int(user.donor_end_at.timestamp()) if user.donor_end_at else 0,
creation_time=int(user.join_date.timestamp()) if user.join_date else 0,
latest_activity=int(user.last_visit.timestamp()) if user.last_visit else 0,
clan_id=0, # TODO: 从 user 获取战队信息
clan_priv=0,
preferred_mode=int(user.playmode) if user.playmode else 0,
preferred_type=0,
play_style=0, # TODO: 从 user.playstyle 获取游戏风格
custom_badge_enabled=0,
custom_badge_name="",
custom_badge_icon="",
custom_badge_color="",
userpage_content=user.page["html"] if user.page and "html" in user.page else "",
recentFailed=0, # TODO: 获取最近失败次数
social_discord=user.discord,
social_youtube=None,
social_twitter=user.twitter,
social_twitch=None,
social_github=None,
social_osu=None,
username_history=user.previous_usernames if user.previous_usernames else []
)
async def _get_player_events(session: Database, user_id: int):
"""获取玩家事件列表"""
# TODO: 实现事件查询逻辑
# 这里应该查询 app.database.events 表
return []
async def _count_online_users_optimized(redis):
"""
优化的在线用户计数函数
首先尝试使用HyperLogLog近似计数失败则回退到SCAN
"""
try:
# 方案1: 尝试使用更高效的方法 - 如果项目维护了在线用户集合
# 检查是否存在在线用户集合键
online_set_key = "metadata:online_users_set"
if await redis.exists(online_set_key):
# 如果存在维护好的在线用户集合,直接获取数量
count = await redis.scard(online_set_key)
logger.debug(f"Using online users set, count: {count}")
return count
except Exception as e:
logger.debug(f"Online users set not available: {e}")
# 方案2: 回退到优化的SCAN操作
online_count = 0
cursor = 0
scan_iterations = 0
max_iterations = 50 # 进一步减少最大迭代次数
batch_size = 10000 # 增加批次大小
try:
while cursor != 0 or scan_iterations == 0:
if scan_iterations >= max_iterations:
logger.warning(f"Redis SCAN reached max iterations ({max_iterations}), breaking")
break
cursor, keys = await redis.scan(
cursor,
match="metadata:online:*",
count=batch_size
)
online_count += len(keys)
scan_iterations += 1
# 如果连续几次没有找到键,可能已经扫描完成
if len(keys) == 0 and scan_iterations > 2:
break
logger.debug(f"Found {online_count} online users after {scan_iterations} scan iterations")
return online_count
except Exception as e:
logger.error(f"Error counting online users: {e}")
# 如果SCAN失败返回0而不是让整个API失败
return 0
@public_router.get(
"/get_player_info",
response_model=GetPlayerInfoResponse,
name="获取玩家信息",
description="返回指定玩家的信息。",
)
async def api_get_player_info(
session: Database,
scope: Literal["stats", "events", "info", "all"] = Query(..., description="信息范围"),
id: int | None = Query(None, ge=3, le=2147483647, description="用户 ID"),
name: str | None = Query(None, regex=r"^[\w \[\]-]{2,32}$", description="用户名"),
):
"""
获取指定玩家的信息
Args:
scope: 信息范围 - stats(统计), events(事件), info(基本信息), all(全部)
id: 用户 ID (可选)
name: 用户名 (可选)
"""
# 验证参数
if not id and not name:
raise HTTPException(400, "Must provide either id or name")
# 查询用户
if id:
user = await session.get(User, id)
else:
user = (await session.exec(select(User).where(User.username == name))).first()
if not user:
raise HTTPException(404, "User not found")
try:
if scope == "stats":
# 获取所有模式的统计数据
user_statistics = list((
await session.exec(
select(UserStatistics).where(UserStatistics.user_id == user.id)
)
).all())
stats_dict = {}
# 获取所有游戏模式的统计数据
all_modes = [GameMode.OSU, GameMode.TAIKO, GameMode.FRUITS, GameMode.MANIA,
GameMode.OSURX, GameMode.OSUAP]
for mode in all_modes:
mode_stats = await _create_player_mode_stats(session, user, mode, user_statistics)
stats_dict[str(int(mode))] = mode_stats
return GetPlayerInfoResponse(
player=PlayerStatsResponse(stats=stats_dict)
)
elif scope == "events":
# 获取事件数据
events = await _get_player_events(session, user.id)
return GetPlayerInfoResponse(
player=PlayerEventsResponse(events=events)
)
elif scope == "info":
# 获取基本信息
info = await _create_player_info(user)
return GetPlayerInfoResponse(
player=PlayerInfoResponse(info=info)
)
elif scope == "all":
# 获取所有信息
# 统计数据
user_statistics = list((
await session.exec(
select(UserStatistics).where(UserStatistics.user_id == user.id)
)
).all())
stats_dict = {}
all_modes = [GameMode.OSU, GameMode.TAIKO, GameMode.FRUITS, GameMode.MANIA,
GameMode.OSURX, GameMode.OSUAP]
for mode in all_modes:
mode_stats = await _create_player_mode_stats(session, user, mode, user_statistics)
stats_dict[str(int(mode))] = mode_stats
# 基本信息
info = await _create_player_info(user)
# 事件
events = await _get_player_events(session, user.id)
return GetPlayerInfoResponse(
player=PlayerAllResponse(
info=info,
stats=stats_dict,
events=events
)
)
except Exception as e:
logger.error(f"Error processing get_player_info for user {user.id}: {e}")
raise HTTPException(500, "Internal server error")
@public_router.get(
"/get_player_count",
response_model=GetPlayerCountResponse,
name="获取玩家数量",
description="返回在线和总用户数量。",
)
async def api_get_player_count(
session: Database,
):
"""
获取玩家数量统计
Returns:
包含在线用户数和总用户数的响应
"""
try:
redis = get_redis()
online_cache_key = "stats:online_users_count"
cached_online = await redis.get(online_cache_key)
if cached_online is not None:
online_count = int(cached_online)
logger.debug(f"Using cached online user count: {online_count}")
else:
logger.debug("Cache miss, scanning Redis for online users")
online_count = await _count_online_users_optimized(redis)
await redis.setex(online_cache_key, 30, str(online_count))
logger.debug(f"Cached online user count: {online_count} for 30 seconds")
cache_key = "stats:total_users"
cached_total = await redis.get(cache_key)
if cached_total is not None:
total_count = int(cached_total)
logger.debug(f"Using cached total user count: {total_count}")
else:
logger.debug("Cache miss, querying database for total user count")
from sqlmodel import func, select
total_count_result = await session.exec(
select(func.count()).select_from(User)
)
total_count = total_count_result.one()
await redis.setex(cache_key, 3600, str(total_count))
logger.debug(f"Cached total user count: {total_count} for 1 hour")
return GetPlayerCountResponse(
counts=PlayerCountData(
online=online_count,
total=max(0, total_count - 1) # 减去1个机器人账户确保不为负数
)
)
except Exception as e:
logger.error(f"Error getting player count: {e}")
raise HTTPException(500, "Internal server error")

View File

@@ -3,17 +3,31 @@ from __future__ import annotations
from datetime import datetime
from typing import Literal
from app.database.events import Event, EventType
from app.database.lazer_user import User
from app.database.rank_history import RankHistory, RankHistoryResp
from app.database.statistics import UserStatistics, UserStatisticsResp
from app.dependencies.database import Database, get_redis
from app.log import logger
from app.models.score import GameMode
from app.models.v1_user import (
GetPlayerInfoResponse,
PlayerAllResponse,
PlayerEventItem,
PlayerEventsResponse,
PlayerInfo,
PlayerInfoResponse,
PlayerModeStats,
PlayerStatsHistory,
PlayerStatsResponse,
)
from app.service.user_cache_service import get_user_cache_service
from .router import AllStrModel, router
from fastapi import BackgroundTasks, HTTPException, Query
from sqlmodel import select
from pydantic import BaseModel, Field
from sqlmodel import col, select
class V1User(AllStrModel):
@@ -153,3 +167,153 @@ async def get_user(
except ValueError as e:
logger.error(f"Error processing V1 user data: {e}")
raise HTTPException(500, "Internal server error")
# 以下为 get_player_info 接口相关的实现函数
async def _get_pp_history_for_mode(session: Database, user_id: int, mode: GameMode, days: int = 30) -> list[float]:
"""获取指定模式的 PP 历史数据"""
try:
# 获取最近 30 天的排名历史(由于没有 PP 历史,我们使用当前的 PP 填充)
stats = (
await session.exec(
select(UserStatistics).where(
UserStatistics.user_id == user_id,
UserStatistics.mode == mode
)
)
).first()
current_pp = stats.pp if stats else 0.0
# 创建 30 天的 PP 历史(使用当前 PP 值填充)
return [current_pp] * days
except Exception as e:
logger.error(f"Error getting PP history for user {user_id}, mode {mode}: {e}")
return [0.0] * days
async def _create_player_mode_stats(
session: Database,
user: User,
mode: GameMode,
user_statistics: list[UserStatistics]
) -> PlayerModeStats:
"""创建单个模式的玩家统计数据"""
# 查找对应模式的统计数据
stats = None
for stat in user_statistics:
if stat.mode == mode:
stats = stat
break
if not stats:
# 如果没有统计数据,创建默认数据
pp_history = [0.0] * 30
return PlayerModeStats(
id=user.id,
mode=int(mode),
tscore=0,
rscore=0,
pp=0.0,
plays=0,
playtime=0,
acc=0.0,
max_combo=0,
total_hits=0,
replay_views=0,
xh_count=0,
x_count=0,
sh_count=0,
s_count=0,
a_count=0,
level=1,
level_progress=0,
rank=0,
country_rank=0,
history=PlayerStatsHistory(pp=pp_history)
)
# 获取排名信息
try:
from app.database.statistics import get_rank
global_rank = await get_rank(session, stats) or 0
country_rank = await get_rank(session, stats, user.country_code) or 0
except Exception as e:
logger.error(f"Error getting rank for user {user.id}: {e}")
global_rank = 0
country_rank = 0
# 获取 PP 历史
pp_history = await _get_pp_history_for_mode(session, user.id, mode)
# 计算等级进度
level_current = int(stats.level_current)
level_progress = int((stats.level_current - level_current) * 100)
return PlayerModeStats(
id=user.id,
mode=int(mode),
tscore=stats.total_score,
rscore=stats.ranked_score,
pp=stats.pp,
plays=stats.play_count,
playtime=stats.play_time,
acc=stats.hit_accuracy,
max_combo=stats.maximum_combo,
total_hits=stats.total_hits,
replay_views=stats.replays_watched_by_others,
xh_count=stats.grade_ssh,
x_count=stats.grade_ss,
sh_count=stats.grade_sh,
s_count=stats.grade_s,
a_count=stats.grade_a,
level=level_current,
level_progress=level_progress,
rank=global_rank,
country_rank=country_rank,
history=PlayerStatsHistory(pp=pp_history)
)
async def _get_player_events(session: Database, user_id: int, event_days: int = 1) -> list[PlayerEventItem]:
"""获取玩家事件"""
try:
# 这里暂时返回空列表,因为事件系统需要更多的实现
# TODO: 实现真正的事件查询
return []
except Exception as e:
logger.error(f"Error getting events for user {user_id}: {e}")
return []
async def _create_player_info(user: User) -> PlayerInfo:
"""创建玩家基本信息"""
return PlayerInfo(
id=user.id,
name=user.username,
safe_name=user.username.lower(), # 使用 username 转小写作为 safe_name
priv=user.priv,
country=user.country_code,
silence_end=int(user.silence_end_at.timestamp()) if user.silence_end_at else 0,
donor_end=int(user.donor_end_at.timestamp()) if user.donor_end_at else 0,
creation_time=int(user.join_date.timestamp()),
latest_activity=int(user.last_visit.timestamp()) if user.last_visit else 0,
clan_id=0, # TODO: 实现战队系统
clan_priv=0,
preferred_mode=int(user.playmode),
preferred_type=0,
play_style=0,
custom_badge_enabled=0,
custom_badge_name="",
custom_badge_icon="",
custom_badge_color="white",
userpage_content=user.page.get("html", "") if user.page else "",
recentFailed=0,
social_discord=user.discord,
social_youtube=None,
social_twitter=user.twitter,
social_twitch=None,
social_github=None,
social_osu=None,
username_history=user.previous_usernames or []
)

View File

@@ -19,6 +19,7 @@ from app.router import (
private_router,
redirect_api_router,
)
from app.router.v1 import api_v1_public_router
from app.router.redirect import redirect_router
from app.scheduler.cache_scheduler import start_cache_scheduler, stop_cache_scheduler
from app.scheduler.database_cleanup_scheduler import (
@@ -157,6 +158,7 @@ app = FastAPI(
app.include_router(api_v2_router)
app.include_router(api_v1_router)
app.include_router(api_v1_public_router)
app.include_router(chat_router)
app.include_router(redirect_api_router)
app.include_router(fetcher_router)