refactor(assets_proxy): use decorators to simplify code

This commit is contained in:
MingxuanGame
2025-10-03 17:12:28 +00:00
parent d490239f46
commit 046f894407
53 changed files with 151 additions and 313 deletions

View File

@@ -1,79 +0,0 @@
"""
资源代理辅助函数和中间件
"""
from __future__ import annotations
from typing import Any
from app.config import settings
from app.service.asset_proxy_service import get_asset_proxy_service
from fastapi import Request
async def process_response_assets(data: Any) -> Any:
"""
根据配置处理响应数据中的资源URL
Args:
data: API响应数据
request: FastAPI请求对象
Returns:
处理后的数据
"""
if not settings.enable_asset_proxy:
return data
asset_service = get_asset_proxy_service()
# 仅URL替换模式
return await asset_service.replace_asset_urls(data)
def should_process_asset_proxy(path: str) -> bool:
"""
判断路径是否需要处理资源代理
"""
# 只对特定的API端点处理资源代理
asset_proxy_endpoints = [
"/api/v1/users/",
"/api/v2/users/",
"/api/v1/me/",
"/api/v2/me/",
"/api/v2/beatmapsets/search",
"/api/v2/beatmapsets/lookup",
"/api/v2/beatmaps/",
"/api/v1/beatmaps/",
"/api/v2/beatmapsets/",
# 可以根据需要添加更多端点
]
return any(path.startswith(endpoint) for endpoint in asset_proxy_endpoints)
# 响应处理装饰器
def asset_proxy_response(func):
"""
装饰器自动处理响应中的资源URL
"""
async def wrapper(*args, **kwargs):
# 获取request对象
request = None
for arg in args:
if isinstance(arg, Request):
request = arg
break
# 执行原函数
result = await func(*args, **kwargs)
# 如果有request对象且启用了资源代理则处理响应
if request and settings.enable_asset_proxy and should_process_asset_proxy(request.url.path):
result = await process_response_assets(result)
return result
return wrapper

View File

@@ -1,83 +0,0 @@
"""
资源文件代理服务
提供URL替换方案将osu!官方资源URL替换为自定义域名
"""
from __future__ import annotations
import re
from typing import Any
from app.config import settings
class AssetProxyService:
"""资源代理服务 - 仅URL替换模式"""
def __init__(self):
# 从配置获取自定义assets域名和前缀
self.custom_asset_domain = settings.custom_asset_domain
self.asset_proxy_prefix = settings.asset_proxy_prefix
self.avatar_proxy_prefix = settings.avatar_proxy_prefix
self.beatmap_proxy_prefix = settings.beatmap_proxy_prefix
# 音频代理接口URL
self.audio_proxy_base_url = f"{settings.server_url}api/private/audio/beatmapset"
async def replace_asset_urls(self, data: Any) -> Any:
"""
递归替换数据中的osu!资源URL为自定义域名
"""
# 处理Pydantic模型
if hasattr(data, "model_dump"):
# 转换为字典,处理后再转换回模型
data_dict = data.model_dump()
processed_dict = await self.replace_asset_urls(data_dict)
# 尝试从字典重新创建模型
try:
return data.__class__(**processed_dict)
except Exception:
# 如果重新创建失败,返回字典
return processed_dict
elif isinstance(data, dict):
result = {}
for key, value in data.items():
result[key] = await self.replace_asset_urls(value)
return result
elif isinstance(data, list):
return [await self.replace_asset_urls(item) for item in data]
elif isinstance(data, str):
# 替换各种osu!资源域名
result = data
# 替换 assets.ppy.sh (用户头像、封面、奖章等)
result = re.sub(
r"https://assets\.ppy\.sh/", f"https://{self.asset_proxy_prefix}.{self.custom_asset_domain}/", result
)
# 替换 b.ppy.sh 预览音频为我们的音频代理接口
# 匹配 https://b.ppy.sh/preview/{beatmapset_id}.mp3 格式
result = re.sub(r"https://b\.ppy\.sh/preview/(\d+)\.mp3", rf"{self.audio_proxy_base_url}/\1", result)
# 匹配 //b.ppy.sh/preview/{beatmapset_id}.mp3 格式
result = re.sub(r"//b\.ppy\.sh/preview/(\d+)\.mp3", rf"{self.audio_proxy_base_url}/\1", result)
# 替换 a.ppy.sh 头像
result = re.sub(
r"https://a\.ppy\.sh/", f"https://{self.avatar_proxy_prefix}.{self.custom_asset_domain}/", result
)
return result
else:
return data
# 全局实例
_asset_proxy_service: AssetProxyService | None = None
def get_asset_proxy_service() -> AssetProxyService:
"""获取资源代理服务实例"""
global _asset_proxy_service
if _asset_proxy_service is None:
_asset_proxy_service = AssetProxyService()
return _asset_proxy_service

View File

@@ -12,9 +12,9 @@ from typing import TYPE_CHECKING, Literal
from app.config import settings
from app.database.statistics import UserStatistics, UserStatisticsResp
from app.helpers.asset_proxy_helper import replace_asset_urls
from app.log import logger
from app.models.score import GameMode
from app.service.asset_proxy_service import get_asset_proxy_service
from app.utils import utcnow
from redis.asyncio import Redis
@@ -357,16 +357,15 @@ class RankingCacheService:
for statistics in statistics_data:
user_stats_resp = await UserStatisticsResp.from_db(statistics, session, None, include)
user_dict = user_stats_resp.model_dump()
# 应用资源代理处理
if settings.enable_asset_proxy:
try:
asset_proxy_service = get_asset_proxy_service()
user_stats_resp = await asset_proxy_service.replace_asset_urls(user_stats_resp)
user_dict = await replace_asset_urls(user_dict)
except Exception as e:
logger.warning(f"Asset proxy processing failed for ranking cache: {e}")
# 将 UserStatisticsResp 转换为字典,处理所有序列化问题
user_dict = json.loads(user_stats_resp.model_dump_json())
ranking_data.append(user_dict)
# 缓存这一页的数据

View File

@@ -15,9 +15,9 @@ from app.database import User, UserResp
from app.database.score import LegacyScoreResp, ScoreResp
from app.database.user import SEARCH_INCLUDED
from app.dependencies.database import with_db
from app.helpers.asset_proxy_helper import replace_asset_urls
from app.log import logger
from app.models.score import GameMode
from app.service.asset_proxy_service import get_asset_proxy_service
from redis.asyncio import Redis
from sqlmodel import col, select
@@ -318,8 +318,7 @@ class UserCacheService:
# 应用资源代理处理
if settings.enable_asset_proxy:
try:
asset_proxy_service = get_asset_proxy_service()
user_resp = await asset_proxy_service.replace_asset_urls(user_resp)
user_resp = await replace_asset_urls(user_resp)
except Exception as e:
logger.warning(f"Asset proxy processing failed for user cache {user.id}: {e}")