diff --git a/app/fetcher/beatmapset.py b/app/fetcher/beatmapset.py index 641e1e6..c9de230 100644 --- a/app/fetcher/beatmapset.py +++ b/app/fetcher/beatmapset.py @@ -15,6 +15,12 @@ from app.utils import bg_tasks from ._base import BaseFetcher import redis.asyncio as redis +from httpx import AsyncClient + + +class RateLimitError(Exception): + """速率限制异常""" + pass class BeatmapsetFetcher(BaseFetcher): @@ -39,12 +45,40 @@ class BeatmapsetFetcher(BaseFetcher): return homepage_queries async def request_api(self, url: str, method: str = "GET", **kwargs) -> dict: - """覆盖基类方法,添加速率限制""" + """覆盖基类方法,添加速率限制和429错误处理""" # 在请求前获取速率限制许可 await osu_api_rate_limiter.acquire() - # 调用基类的请求方法 - return await super().request_api(url, method, **kwargs) + # 检查 token 是否过期,如果过期则刷新 + if self.is_token_expired(): + await self.refresh_access_token() + + header = kwargs.pop("headers", {}) + header.update(self.header) + + async with AsyncClient() as client: + response = await client.request( + method, + url, + headers=header, + **kwargs, + ) + + # 处理 429 错误 - 直接抛出异常,不重试 + if response.status_code == 429: + logger.warning(f"Rate limit exceeded (429) for {url}") + raise RateLimitError(f"Rate limit exceeded for {url}. Please try again later.") + + # 处理 401 错误 + if response.status_code == 401: + logger.warning(f"Received 401 error for {url}") + await self._clear_tokens() + raise TokenAuthError( + f"Authentication failed. Please re-authorize using: {self.authorize_url}" + ) + + response.raise_for_status() + return response.json() @staticmethod def _generate_cache_key(query: SearchQueryModel, cursor: Cursor) -> str: @@ -168,7 +202,10 @@ class BeatmapsetFetcher(BaseFetcher): # 不立即创建任务,而是延迟一段时间再预取 async def delayed_prefetch(): await asyncio.sleep(3.0) # 延迟3秒 - await self.prefetch_next_pages(query, api_response["cursor"], redis_client, pages=1) + try: + await self.prefetch_next_pages(query, api_response["cursor"], redis_client, pages=1) + except RateLimitError: + logger.opt(colors=True).info("[BeatmapsetFetcher] Prefetch skipped due to rate limit") bg_tasks.add_task(delayed_prefetch) @@ -248,6 +285,8 @@ class BeatmapsetFetcher(BaseFetcher): f"[BeatmapsetFetcher] Prefetched page {page + 1} (TTL: {prefetch_ttl}s)" ) + except RateLimitError: + logger.opt(colors=True).info("[BeatmapsetFetcher] Prefetch stopped due to rate limit") except Exception as e: logger.opt(colors=True).warning(f"[BeatmapsetFetcher] Prefetch failed: {e}") @@ -299,9 +338,18 @@ class BeatmapsetFetcher(BaseFetcher): ) if api_response.get("cursor"): - await self.prefetch_next_pages(query, api_response["cursor"], redis_client, pages=2) + try: + await self.prefetch_next_pages(query, api_response["cursor"], redis_client, pages=2) + except RateLimitError: + logger.opt(colors=True).info( + f"[BeatmapsetFetcher] Warmup prefetch skipped for {query.sort} due to rate limit" + ) + except RateLimitError: + logger.opt(colors=True).warning( + f"[BeatmapsetFetcher] Warmup skipped for {query.sort} due to rate limit" + ) except Exception as e: logger.opt(colors=True).error( f"[BeatmapsetFetcher] Failed to warmup cache for {query.sort}: {e}" - ) + ) \ No newline at end of file