diff --git a/app/models/score.py b/app/models/score.py index 9ea1a4d..410e3a1 100644 --- a/app/models/score.py +++ b/app/models/score.py @@ -49,6 +49,9 @@ class GameMode(str, Enum): GameMode.FRUITSRX: 2, }[self] + def __str__(self) -> str: + return self.value + @classmethod def from_int(cls, v: int) -> "GameMode": return { diff --git a/app/router/v2/ranking.py b/app/router/v2/ranking.py index fe884c9..9fd1e86 100644 --- a/app/router/v2/ranking.py +++ b/app/router/v2/ranking.py @@ -3,8 +3,7 @@ from __future__ import annotations from typing import Literal from app.config import settings -from app.database import User -from app.database.statistics import UserStatistics, UserStatisticsResp +from app.database import Team, TeamMember, User, UserStatistics, UserStatisticsResp from app.dependencies import get_current_user from app.dependencies.database import Database, get_redis from app.models.score import GameMode @@ -13,10 +12,132 @@ from app.service.ranking_cache_service import get_ranking_cache_service from .router import router from fastapi import BackgroundTasks, Path, Query, Security +from fastapi.responses import RedirectResponse from pydantic import BaseModel from sqlmodel import col, select +class TeamStatistics(BaseModel): + team_id: int + ruleset_id: int + play_count: int + ranked_score: int + performance: int + + +class TeamResponse(BaseModel): + ranking: list[TeamStatistics] + + +SortType = Literal["performance", "score"] + + +@router.get( + "/rankings/{ruleset}/team", + name="获取战队排行榜", + description="获取在指定模式下按照 pp 排序的战队排行榜", + tags=["排行榜"], + status_code=301, +) +async def get_team_ranking_pp( + ruleset: GameMode = Path(..., description="指定 ruleset"), + page: int = Query(1, ge=1, description="页码"), +): + return RedirectResponse(url=f"/api/v2/rankings/{ruleset}/team/performance?page={page}", status_code=301) + + +@router.get( + "/rankings/{ruleset}/team/{sort}", + response_model=TeamResponse, + name="获取战队排行榜", + description="获取在指定模式下的战队排行榜", + tags=["排行榜"], +) +async def get_team_ranking( + session: Database, + background_tasks: BackgroundTasks, + sort: SortType = Path( + ..., + description="排名类型:performance 表现分 / score 计分成绩总分 " + "**这个参数是本服务器额外添加的,不属于 v2 API 的一部分**", + ), + ruleset: GameMode = Path(..., description="指定 ruleset"), + page: int = Query(1, ge=1, description="页码"), + current_user: User = Security(get_current_user, scopes=["public"]), +): + # 获取 Redis 连接和缓存服务 + redis = get_redis() + cache_service = get_ranking_cache_service(redis) + + # 尝试从缓存获取数据(战队排行榜) + cached_data = await cache_service.get_cached_team_ranking(ruleset, page) + + if cached_data: + # 从缓存返回数据 + return TeamResponse(ranking=[TeamStatistics.model_validate(item) for item in cached_data]) + + # 缓存未命中,从数据库查询 + response = TeamResponse(ranking=[]) + teams = (await session.exec(select(Team))).all() + + for team in teams: + statistics = ( + await session.exec( + select(UserStatistics).where( + UserStatistics.mode == ruleset, + UserStatistics.pp > 0, + col(UserStatistics.user).has(col(User.team_membership).has(col(TeamMember.team_id) == team.id)), + ) + ) + ).all() + + if not statistics: + continue + + pp = 0 + stats = TeamStatistics( + team_id=team.id, + ruleset_id=int(ruleset), + play_count=0, + ranked_score=0, + performance=0, + ) + for stat in statistics: + stats.ranked_score += stat.ranked_score + pp += stat.pp + stats.performance = round(pp) + response.ranking.append(stats) + + if sort == "performance": + response.ranking.sort(key=lambda x: x.performance, reverse=True) + else: + response.ranking.sort(key=lambda x: x.ranked_score, reverse=True) + + # 分页处理 + page_size = 50 + start_idx = (page - 1) * page_size + end_idx = start_idx + page_size + + # 获取当前页的数据 + current_page_data = response.ranking[start_idx:end_idx] + + # 异步缓存数据(不等待完成) + cache_data = [item.model_dump() for item in current_page_data] + + # 创建后台任务来缓存数据 + background_tasks.add_task( + cache_service.cache_team_ranking, + ruleset, + cache_data, + page, + ttl=settings.ranking_cache_expire_minutes * 60, + ) + + # 返回当前页的结果 + response.ranking = current_page_data + return response + + class CountryStatistics(BaseModel): code: str active_users: int @@ -31,6 +152,20 @@ class CountryResponse(BaseModel): @router.get( "/rankings/{ruleset}/country", + name="获取地区排行榜", + description="获取在指定模式下按照 pp 排序的地区排行榜", + tags=["排行榜"], + status_code=301, +) +async def get_country_ranking_pp( + ruleset: GameMode = Path(..., description="指定 ruleset"), + page: int = Query(1, ge=1, description="页码"), +): + return RedirectResponse(url=f"/api/v2/rankings/{ruleset}/country/performance?page={page}", status_code=301) + + +@router.get( + "/rankings/{ruleset}/country/{sort}", response_model=CountryResponse, name="获取地区排行榜", description="获取在指定模式下的地区排行榜", @@ -40,7 +175,12 @@ async def get_country_ranking( session: Database, background_tasks: BackgroundTasks, ruleset: GameMode = Path(..., description="指定 ruleset"), - page: int = Query(1, ge=1, description="页码"), # TODO + page: int = Query(1, ge=1, description="页码"), + sort: SortType = Path( + ..., + description="排名类型:performance 表现分 / score 计分成绩总分 " + "**这个参数是本服务器额外添加的,不属于 v2 API 的一部分**", + ), current_user: User = Security(get_current_user, scopes=["public"]), ): # 获取 Redis 连接和缓存服务 @@ -92,7 +232,10 @@ async def get_country_ranking( country_stats.performance = round(pp) response.ranking.append(country_stats) - response.ranking.sort(key=lambda x: x.performance, reverse=True) + if sort == "performance": + response.ranking.sort(key=lambda x: x.performance, reverse=True) + else: + response.ranking.sort(key=lambda x: x.ranked_score, reverse=True) # 分页处理 page_size = 50 @@ -124,7 +267,7 @@ class TopUsersResponse(BaseModel): @router.get( - "/rankings/{ruleset}/{type}", + "/rankings/{ruleset}/{sort}", response_model=TopUsersResponse, name="获取用户排行榜", description="获取在指定模式下的用户排行榜", @@ -134,7 +277,7 @@ async def get_user_ranking( session: Database, background_tasks: BackgroundTasks, ruleset: GameMode = Path(..., description="指定 ruleset"), - type: Literal["performance", "score"] = Path(..., description="排名类型:performance 表现分 / score 计分成绩总分"), + sort: SortType = Path(..., description="排名类型:performance 表现分 / score 计分成绩总分"), country: str | None = Query(None, description="国家代码"), page: int = Query(1, ge=1, description="页码"), current_user: User = Security(get_current_user, scopes=["public"]), @@ -144,7 +287,7 @@ async def get_user_ranking( cache_service = get_ranking_cache_service(redis) # 尝试从缓存获取数据 - cached_data = await cache_service.get_cached_ranking(ruleset, type, country, page) + cached_data = await cache_service.get_cached_ranking(ruleset, sort, country, page) if cached_data: # 从缓存返回数据 @@ -157,7 +300,7 @@ async def get_user_ranking( col(UserStatistics.is_ranked).is_(True), ] include = ["user"] - if type == "performance": + if sort == "performance": order_by = col(UserStatistics.pp).desc() include.append("rank_change_since_30_days") else: @@ -183,7 +326,7 @@ async def get_user_ranking( background_tasks.add_task( cache_service.cache_ranking, ruleset, - type, + sort, cache_data, country, page, @@ -192,132 +335,3 @@ async def get_user_ranking( resp = TopUsersResponse(ranking=ranking_data) return resp - - -# @router.post( -# "/rankings/cache/refresh", -# name="刷新排行榜缓存", -# description="手动刷新排行榜缓存(管理员功能)", -# tags=["排行榜", "管理"], -# ) -# async def refresh_ranking_cache( -# session: Database, -# ruleset: GameMode | None = Query(None, description="指定要刷新的游戏模式,不指定则刷新所有"), -# type: Literal["performance", "score"] | None = Query(None, description="指定要刷新的排名类型,不指定则刷新所有"), -# country: str | None = Query(None, description="指定要刷新的国家,不指定则刷新所有"), -# include_country_ranking: bool = Query(True, description="是否包含地区排行榜"), -# current_user: User = Security(get_current_user, scopes=["admin"]), # 需要管理员权限 -# ): -# redis = get_redis() -# cache_service = get_ranking_cache_service(redis) - -# if ruleset and type: -# # 刷新特定的用户排行榜 -# await cache_service.refresh_ranking_cache(session, ruleset, type, country) -# message = f"Refreshed ranking cache for {ruleset}:{type}" + (f" in {country}" if country else "") - -# # 如果请求刷新地区排行榜 -# if include_country_ranking and not country: # 地区排行榜不依赖于国家参数 -# await cache_service.refresh_country_ranking_cache(session, ruleset) -# message += f" and country ranking for {ruleset}" - -# return {"message": message} -# elif ruleset: -# # 刷新特定游戏模式的所有排行榜 -# ranking_types: list[Literal["performance", "score"]] = ["performance", "score"] -# for ranking_type in ranking_types: -# await cache_service.refresh_ranking_cache(session, ruleset, ranking_type, country) - -# if include_country_ranking: -# await cache_service.refresh_country_ranking_cache(session, ruleset) - -# return {"message": f"Refreshed all ranking caches for {ruleset}"} -# else: -# # 刷新所有排行榜 -# await cache_service.refresh_all_rankings(session) -# return {"message": "Refreshed all ranking caches"} - - -# @router.post( -# "/rankings/{ruleset}/country/cache/refresh", -# name="刷新地区排行榜缓存", -# description="手动刷新地区排行榜缓存(管理员功能)", -# tags=["排行榜", "管理"], -# ) -# async def refresh_country_ranking_cache( -# session: Database, -# ruleset: GameMode = Path(..., description="指定要刷新的游戏模式"), -# current_user: User = Security(get_current_user, scopes=["admin"]), # 需要管理员权限 -# ): -# redis = get_redis() -# cache_service = get_ranking_cache_service(redis) - -# await cache_service.refresh_country_ranking_cache(session, ruleset) -# return {"message": f"Refreshed country ranking cache for {ruleset}"} - - -# @router.delete( -# "/rankings/cache", -# name="清除排行榜缓存", -# description="清除排行榜缓存(管理员功能)", -# tags=["排行榜", "管理"], -# ) -# async def clear_ranking_cache( -# ruleset: GameMode | None = Query(None, description="指定要清除的游戏模式,不指定则清除所有"), -# type: Literal["performance", "score"] | None = Query(None, description="指定要清除的排名类型,不指定则清除所有"), -# country: str | None = Query(None, description="指定要清除的国家,不指定则清除所有"), -# include_country_ranking: bool = Query(True, description="是否包含地区排行榜"), -# current_user: User = Security(get_current_user, scopes=["admin"]), # 需要管理员权限 -# ): -# redis = get_redis() -# cache_service = get_ranking_cache_service(redis) - -# await cache_service.invalidate_cache(ruleset, type, country, include_country_ranking) - -# if ruleset and type: -# message = f"Cleared ranking cache for {ruleset}:{type}" + (f" in {country}" if country else "") -# if include_country_ranking: -# message += " and country ranking" -# return {"message": message} -# else: -# message = "Cleared all ranking caches" -# if include_country_ranking: -# message += " including country rankings" -# return {"message": message} - - -# @router.delete( -# "/rankings/{ruleset}/country/cache", -# name="清除地区排行榜缓存", -# description="清除地区排行榜缓存(管理员功能)", -# tags=["排行榜", "管理"], -# ) -# async def clear_country_ranking_cache( -# ruleset: GameMode | None = Query(None, description="指定要清除的游戏模式,不指定则清除所有"), -# current_user: User = Security(get_current_user, scopes=["admin"]), # 需要管理员权限 -# ): -# redis = get_redis() -# cache_service = get_ranking_cache_service(redis) - -# await cache_service.invalidate_country_cache(ruleset) - -# if ruleset: -# return {"message": f"Cleared country ranking cache for {ruleset}"} -# else: -# return {"message": "Cleared all country ranking caches"} - - -# @router.get( -# "/rankings/cache/stats", -# name="获取排行榜缓存统计", -# description="获取排行榜缓存统计信息(管理员功能)", -# tags=["排行榜", "管理"], -# ) -# async def get_ranking_cache_stats( -# current_user: User = Security(get_current_user, scopes=["admin"]), # 需要管理员权限 -# ): -# redis = get_redis() -# cache_service = get_ranking_cache_service(redis) - -# stats = await cache_service.get_cache_stats() -# return stats diff --git a/app/service/ranking_cache_service.py b/app/service/ranking_cache_service.py index 6cbce52..fa2f4d2 100644 --- a/app/service/ranking_cache_service.py +++ b/app/service/ranking_cache_service.py @@ -76,6 +76,14 @@ class RankingCacheService: """生成地区排行榜统计信息缓存键""" return f"country_ranking:stats:{ruleset}" + def _get_team_cache_key(self, ruleset: GameMode, page: int = 1) -> str: + """生成战队排行榜缓存键""" + return f"team_ranking:{ruleset}:page:{page}" + + def _get_team_stats_cache_key(self, ruleset: GameMode) -> str: + """生成战队排行榜统计信息缓存键""" + return f"team_ranking:stats:{ruleset}" + async def get_cached_ranking( self, ruleset: GameMode, @@ -186,6 +194,69 @@ class RankingCacheService: except Exception as e: logger.error(f"Error caching country ranking: {e}") + async def get_cached_team_ranking( + self, + ruleset: GameMode, + page: int = 1, + ) -> list[dict] | None: + """获取缓存的战队排行榜数据""" + try: + cache_key = self._get_team_cache_key(ruleset, page) + cached_data = await self.redis.get(cache_key) + + if cached_data: + return json.loads(cached_data) + return None + except Exception as e: + logger.error(f"Error getting cached team ranking: {e}") + return None + + async def cache_team_ranking( + self, + ruleset: GameMode, + ranking_data: list[dict], + page: int = 1, + ttl: int | None = None, + ) -> None: + """缓存战队排行榜数据""" + try: + cache_key = self._get_team_cache_key(ruleset, page) + if ttl is None: + ttl = settings.ranking_cache_expire_minutes * 60 + await self.redis.set(cache_key, safe_json_dumps(ranking_data), ex=ttl) + logger.debug(f"Cached team ranking data for {cache_key}") + except Exception as e: + logger.error(f"Error caching team ranking: {e}") + + async def get_cached_team_stats(self, ruleset: GameMode) -> dict | None: + """获取缓存的战队排行榜统计信息""" + try: + cache_key = self._get_team_stats_cache_key(ruleset) + cached_data = await self.redis.get(cache_key) + + if cached_data: + return json.loads(cached_data) + return None + except Exception as e: + logger.error(f"Error getting cached team stats: {e}") + return None + + async def cache_team_stats( + self, + ruleset: GameMode, + stats: dict, + ttl: int | None = None, + ) -> None: + """缓存战队排行榜统计信息""" + try: + cache_key = self._get_team_stats_cache_key(ruleset) + if ttl is None: + ttl = settings.ranking_cache_expire_minutes * 60 * 6 + await self.redis.set(cache_key, safe_json_dumps(stats), ex=ttl) + logger.debug(f"Cached team stats for {cache_key}") + except Exception as e: + logger.error(f"Error caching team stats: {e}") + async def get_cached_country_stats(self, ruleset: GameMode) -> dict | None: """获取缓存的地区排行榜统计信息""" try: