Add asset proxy feature for resource URLs

Introduces asset proxy configuration and services to enable replacement of osu! resource URLs with custom domains. Updates API endpoints and caching services to process and rewrite resource URLs when asset proxy is enabled. Adds documentation and environment variables for asset proxy setup.
This commit is contained in:
咕谷酱
2025-08-22 22:03:51 +08:00
parent 1f40c6f70d
commit 6bcd8c1a21
11 changed files with 241 additions and 10 deletions

View File

@@ -0,0 +1,76 @@
"""
资源代理辅助函数和中间件
"""
from __future__ import annotations
from typing import Any
from fastapi import Request
from app.config import settings
from app.service.asset_proxy_service import get_asset_proxy_service
async def process_response_assets(data: Any, request: Request) -> Any:
"""
根据配置处理响应数据中的资源URL
Args:
data: API响应数据
request: FastAPI请求对象
Returns:
处理后的数据
"""
if not settings.enable_asset_proxy:
return data
asset_service = get_asset_proxy_service()
# 仅URL替换模式
return await asset_service.replace_asset_urls(data)
def should_process_asset_proxy(path: str) -> bool:
"""
判断路径是否需要处理资源代理
"""
# 只对特定的API端点处理资源代理
asset_proxy_endpoints = [
"/api/v1/users/",
"/api/v2/users/",
"/api/v1/me/",
"/api/v2/me/",
"/api/v2/beatmapsets/search",
"/api/v2/beatmapsets/lookup",
"/api/v2/beatmaps/",
"/api/v1/beatmaps/",
"/api/v2/beatmapsets/",
# 可以根据需要添加更多端点
]
return any(path.startswith(endpoint) for endpoint in asset_proxy_endpoints)
# 响应处理装饰器
def asset_proxy_response(func):
"""
装饰器自动处理响应中的资源URL
"""
async def wrapper(*args, **kwargs):
# 获取request对象
request = None
for arg in args:
if isinstance(arg, Request):
request = arg
break
# 执行原函数
result = await func(*args, **kwargs)
# 如果有request对象且启用了资源代理则处理响应
if request and settings.enable_asset_proxy and should_process_asset_proxy(request.url.path):
result = await process_response_assets(result, request)
return result
return wrapper

View File

@@ -0,0 +1,92 @@
"""
资源文件代理服务
提供URL替换方案将osu!官方资源URL替换为自定义域名
"""
from __future__ import annotations
import re
from typing import Any
from app.config import settings
from app.log import logger
class AssetProxyService:
"""资源代理服务 - 仅URL替换模式"""
def __init__(self):
# 从配置获取自定义assets域名和前缀
self.custom_asset_domain = settings.custom_asset_domain
self.asset_proxy_prefix = settings.asset_proxy_prefix
self.avatar_proxy_prefix = settings.avatar_proxy_prefix
self.beatmap_proxy_prefix = settings.beatmap_proxy_prefix
async def replace_asset_urls(self, data: Any) -> Any:
"""
递归替换数据中的osu!资源URL为自定义域名
"""
# 处理Pydantic模型
if hasattr(data, 'model_dump'):
# 转换为字典,处理后再转换回模型
data_dict = data.model_dump()
processed_dict = await self.replace_asset_urls(data_dict)
# 尝试从字典重新创建模型
try:
return data.__class__(**processed_dict)
except Exception:
# 如果重新创建失败,返回字典
return processed_dict
elif isinstance(data, dict):
result = {}
for key, value in data.items():
result[key] = await self.replace_asset_urls(value)
return result
elif isinstance(data, list):
return [await self.replace_asset_urls(item) for item in data]
elif isinstance(data, str):
# 替换各种osu!资源域名
result = data
# 替换 assets.ppy.sh (用户头像、封面、奖章等)
result = re.sub(
r"https://assets\.ppy\.sh/",
f"https://{self.asset_proxy_prefix}.{self.custom_asset_domain}/",
result
)
# 替换 b.ppy.sh 预览音频 (保持//前缀)
result = re.sub(
r"//b\.ppy\.sh/",
f"//{self.beatmap_proxy_prefix}.{self.custom_asset_domain}/",
result
)
# 替换 https://b.ppy.sh 预览音频 (转换为//前缀)
result = re.sub(
r"https://b\.ppy\.sh/",
f"//{self.beatmap_proxy_prefix}.{self.custom_asset_domain}/",
result
)
# 替换 a.ppy.sh 头像
result = re.sub(
r"https://a\.ppy\.sh/",
f"https://{self.avatar_proxy_prefix}.{self.custom_asset_domain}/",
result
)
return result
else:
return data
# 全局实例
_asset_proxy_service: AssetProxyService | None = None
def get_asset_proxy_service() -> AssetProxyService:
"""获取资源代理服务实例"""
global _asset_proxy_service
if _asset_proxy_service is None:
_asset_proxy_service = AssetProxyService()
return _asset_proxy_service

View File

@@ -15,6 +15,7 @@ from app.database.statistics import UserStatistics, UserStatisticsResp
from app.log import logger
from app.models.score import GameMode
from app.utils import utcnow
from app.service.asset_proxy_service import get_asset_proxy_service
from redis.asyncio import Redis
from sqlmodel import col, select
@@ -283,6 +284,15 @@ class RankingCacheService:
ranking_data = []
for statistics in statistics_data:
user_stats_resp = await UserStatisticsResp.from_db(statistics, session, None, include)
# 应用资源代理处理
if settings.enable_asset_proxy:
try:
asset_proxy_service = get_asset_proxy_service()
user_stats_resp = await asset_proxy_service.replace_asset_urls(user_stats_resp)
except Exception as e:
logger.warning(f"Asset proxy processing failed for ranking cache: {e}")
# 将 UserStatisticsResp 转换为字典,处理所有序列化问题
user_dict = json.loads(user_stats_resp.model_dump_json())
ranking_data.append(user_dict)

View File

@@ -16,6 +16,7 @@ from app.database.lazer_user import SEARCH_INCLUDED
from app.database.score import ScoreResp
from app.log import logger
from app.models.score import GameMode
from app.service.asset_proxy_service import get_asset_proxy_service
from redis.asyncio import Redis
from sqlmodel import col, select
@@ -298,6 +299,15 @@ class UserCacheService:
"""缓存单个用户"""
try:
user_resp = await UserResp.from_db(user, session, include=SEARCH_INCLUDED)
# 应用资源代理处理
if settings.enable_asset_proxy:
try:
asset_proxy_service = get_asset_proxy_service()
user_resp = await asset_proxy_service.replace_asset_urls(user_resp)
except Exception as e:
logger.warning(f"Asset proxy processing failed for user cache {user.id}: {e}")
await self.cache_user(user_resp)
except Exception as e:
logger.error(f"Error caching single user {user.id}: {e}")