From b4fd4e0256cceec5c0eb7c10be2d2d74e3f0beee Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=92=95=E8=B0=B7=E9=85=B1?=
<74496778+GooGuJiang@users.noreply.github.com>
Date: Sun, 24 Aug 2025 11:14:26 +0800
Subject: [PATCH] Handle rate limit errors in BeatmapsetFetcher
Introduces RateLimitError to manage 429 responses from the API, updating request_api to raise this error and adding handling in prefetch and warmup logic to skip or log when rate limits are hit. Also improves error handling for authentication failures and token expiration.
---
app/fetcher/beatmapset.py | 60 +++++++++++++++++++++++++++++++++++----
1 file changed, 54 insertions(+), 6 deletions(-)
diff --git a/app/fetcher/beatmapset.py b/app/fetcher/beatmapset.py
index 641e1e6..c9de230 100644
--- a/app/fetcher/beatmapset.py
+++ b/app/fetcher/beatmapset.py
@@ -15,6 +15,12 @@ from app.utils import bg_tasks
from ._base import BaseFetcher
import redis.asyncio as redis
+from httpx import AsyncClient
+
+
+class RateLimitError(Exception):
+ """速率限制异常"""
+ pass
class BeatmapsetFetcher(BaseFetcher):
@@ -39,12 +45,40 @@ class BeatmapsetFetcher(BaseFetcher):
return homepage_queries
async def request_api(self, url: str, method: str = "GET", **kwargs) -> dict:
- """覆盖基类方法,添加速率限制"""
+ """覆盖基类方法,添加速率限制和429错误处理"""
# 在请求前获取速率限制许可
await osu_api_rate_limiter.acquire()
- # 调用基类的请求方法
- return await super().request_api(url, method, **kwargs)
+ # 检查 token 是否过期,如果过期则刷新
+ if self.is_token_expired():
+ await self.refresh_access_token()
+
+ header = kwargs.pop("headers", {})
+ header.update(self.header)
+
+ async with AsyncClient() as client:
+ response = await client.request(
+ method,
+ url,
+ headers=header,
+ **kwargs,
+ )
+
+ # 处理 429 错误 - 直接抛出异常,不重试
+ if response.status_code == 429:
+ logger.warning(f"Rate limit exceeded (429) for {url}")
+ raise RateLimitError(f"Rate limit exceeded for {url}. Please try again later.")
+
+ # 处理 401 错误
+ if response.status_code == 401:
+ logger.warning(f"Received 401 error for {url}")
+ await self._clear_tokens()
+ raise TokenAuthError(
+ f"Authentication failed. Please re-authorize using: {self.authorize_url}"
+ )
+
+ response.raise_for_status()
+ return response.json()
@staticmethod
def _generate_cache_key(query: SearchQueryModel, cursor: Cursor) -> str:
@@ -168,7 +202,10 @@ class BeatmapsetFetcher(BaseFetcher):
# 不立即创建任务,而是延迟一段时间再预取
async def delayed_prefetch():
await asyncio.sleep(3.0) # 延迟3秒
- await self.prefetch_next_pages(query, api_response["cursor"], redis_client, pages=1)
+ try:
+ await self.prefetch_next_pages(query, api_response["cursor"], redis_client, pages=1)
+ except RateLimitError:
+ logger.opt(colors=True).info("[BeatmapsetFetcher] Prefetch skipped due to rate limit")
bg_tasks.add_task(delayed_prefetch)
@@ -248,6 +285,8 @@ class BeatmapsetFetcher(BaseFetcher):
f"[BeatmapsetFetcher] Prefetched page {page + 1} (TTL: {prefetch_ttl}s)"
)
+ except RateLimitError:
+ logger.opt(colors=True).info("[BeatmapsetFetcher] Prefetch stopped due to rate limit")
except Exception as e:
logger.opt(colors=True).warning(f"[BeatmapsetFetcher] Prefetch failed: {e}")
@@ -299,9 +338,18 @@ class BeatmapsetFetcher(BaseFetcher):
)
if api_response.get("cursor"):
- await self.prefetch_next_pages(query, api_response["cursor"], redis_client, pages=2)
+ try:
+ await self.prefetch_next_pages(query, api_response["cursor"], redis_client, pages=2)
+ except RateLimitError:
+ logger.opt(colors=True).info(
+ f"[BeatmapsetFetcher] Warmup prefetch skipped for {query.sort} due to rate limit"
+ )
+ except RateLimitError:
+ logger.opt(colors=True).warning(
+ f"[BeatmapsetFetcher] Warmup skipped for {query.sort} due to rate limit"
+ )
except Exception as e:
logger.opt(colors=True).error(
f"[BeatmapsetFetcher] Failed to warmup cache for {query.sort}: {e}"
- )
+ )
\ No newline at end of file